#typescript #rxjs #nestjs
#typescript #rxjs #nestjs
Вопрос:
У меня есть некоторый код в NestJS с оператором mergeMap, где мне нужно остановить поток наблюдаемых на основе некоторых значений из внешних и внутренних наблюдаемых. Рассмотрим этот пример:
type SomeEvent1 {
value: string;
}
type SomeEvent2 {
value: string;
}
@Injectable()
export class TradeSagas {
private readonly logger = new MyLogger(TradeSagas.name);
@Saga()
someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
return events$.pipe(
ofType(SomeEvent1),
mergeMap((event: SomeEvent1) => {
events$.pipe(
ofType(SomeEvent2),
map((event) => new SomeEvent3())
);
})
);
};
Мне нужно иметь возможность отказаться от подписки на поток, который прослушивается SomeEvent1
, или на поток, который прослушивается SomeEvent2
(пока не уверен, что нужно для моего конкретного варианта использования), когда, например, SomeEvent1.value === SomeEvent2.value .
Я знаю, что мог бы использовать оператор takeUntil, но он принимает другой наблюдаемый, поэтому я не понимаю, как я могу заставить его работать.
Причина, по которой я хочу отказаться от подписки, заключается в том, что mergeMap отслеживает все предыдущие значения событий, и иногда это нарушает поток. Я знаю, что мог бы использовать switchMap, но это не решение моей конкретной проблемы, поскольку мне не нужно отменять внешнюю наблюдаемую только потому, что появилось новое событие — у меня немного более сложная логика для построения потока событий. Если есть другие идеи о том, как справиться с этой проблемой, пожалуйста, дайте мне знать.
Спасибо за любую помощь.
Ответ №1:
Для вашего случая takeWhile
это был бы лучший выбор, поскольку он принимает функцию, а не наблюдаемый:
someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
return events$.pipe(
ofType(SomeEvent1),
mergeMap(event1 => events$.pipe(
ofType(SomeEvent2),
takeWhile(event2 => event2.value === event1.value),
map(event2 => new SomeEvent3())
);
})
);
};
Редактировать:
Иногда проще следовать, если вы разбиваете источники на отдельные переменные. Поэтому я думаю, что это может быть то, что вы ищете:
const event1$ = events$.pipe(ofType(SomeEvent1));
const event2$ = events$.pipe(ofType(SomeEvent2));
const event3$ = events$.pipe(ofType(SomeEvent3));
const someSaga2 = events1$.pipe(
mergeMap(event1 => events2$.pipe(
takeUntil(events3$.pipe(
filter(event3 => event3.value === event1.value)
))
))
);
Описание поведения: при Event1
отправке, отправляйте Event2
до Event3
тех пор, пока не будет отправлено значение, которое имеет то value
же Event1
значение, что и .
Комментарии:
1. Большое спасибо! Да, это работает для сценария, который я объяснил. Теперь я понял, что плохо справился с описанием варианта использования, который у меня есть. На самом деле я хотел отменить поток SomeEvent1 -> SomeEvent2 в случае, если существует третий поток SomeEvent3, который генерирует события того же типа, поэтому я хочу остановить поток, если event1.value === event3.value — в противном случае продолжайте слияние с потоком SomeEvent2. Имеет ли это смысл?
2. Пример кода не прослушивал события типа
Event3
, верно? Вы говорите, что послеevent 1
отправки, прослушивайтеevent 2
доevent 3
тех пор, пока не будет отправлено?3. Да, правильно! Прослушивайте
event 2
, покаevent 3
не выдаст.4. Хорошо, ответ обновлен. Есть шанс , что это может сработать для вас 🙂
5. Потрясающе! Я думаю, что это оно