takeUntil с фильтром

#typescript #rxjs #nestjs

#typescript #rxjs #nestjs

Вопрос:

У меня есть некоторый код в NestJS с оператором mergeMap, где мне нужно остановить поток наблюдаемых на основе некоторых значений из внешних и внутренних наблюдаемых. Рассмотрим этот пример:

 type SomeEvent1 {
  value: string;
}

type SomeEvent2 {
  value: string;
}

@Injectable()
export class TradeSagas {
  private readonly logger = new MyLogger(TradeSagas.name);
  @Saga()
  someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
    return events$.pipe(
      ofType(SomeEvent1),
      mergeMap((event: SomeEvent1) => {
        events$.pipe(
          ofType(SomeEvent2),
          map((event) => new SomeEvent3())
        );
      })
    );
  };
 

Мне нужно иметь возможность отказаться от подписки на поток, который прослушивается SomeEvent1 , или на поток, который прослушивается SomeEvent2 (пока не уверен, что нужно для моего конкретного варианта использования), когда, например, SomeEvent1.value === SomeEvent2.value .

Я знаю, что мог бы использовать оператор takeUntil, но он принимает другой наблюдаемый, поэтому я не понимаю, как я могу заставить его работать.

Причина, по которой я хочу отказаться от подписки, заключается в том, что mergeMap отслеживает все предыдущие значения событий, и иногда это нарушает поток. Я знаю, что мог бы использовать switchMap, но это не решение моей конкретной проблемы, поскольку мне не нужно отменять внешнюю наблюдаемую только потому, что появилось новое событие — у меня немного более сложная логика для построения потока событий. Если есть другие идеи о том, как справиться с этой проблемой, пожалуйста, дайте мне знать.

Спасибо за любую помощь.

Ответ №1:

Для вашего случая takeWhile это был бы лучший выбор, поскольку он принимает функцию, а не наблюдаемый:

   someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
    return events$.pipe(
      ofType(SomeEvent1),
      mergeMap(event1 => events$.pipe(
          ofType(SomeEvent2),
          takeWhile(event2 => event2.value === event1.value),
          map(event2 => new SomeEvent3())
        );
      })
    );
  };
 

Редактировать:

Иногда проще следовать, если вы разбиваете источники на отдельные переменные. Поэтому я думаю, что это может быть то, что вы ищете:

 const event1$ = events$.pipe(ofType(SomeEvent1));
const event2$ = events$.pipe(ofType(SomeEvent2));
const event3$ = events$.pipe(ofType(SomeEvent3));

const someSaga2 = events1$.pipe(
  mergeMap(event1 => events2$.pipe(
    takeUntil(events3$.pipe(
        filter(event3 => event3.value === event1.value)
    ))
  ))
);
 

Описание поведения: при Event1 отправке, отправляйте Event2 до Event3 тех пор, пока не будет отправлено значение, которое имеет то value же Event1 значение, что и .

Комментарии:

1. Большое спасибо! Да, это работает для сценария, который я объяснил. Теперь я понял, что плохо справился с описанием варианта использования, который у меня есть. На самом деле я хотел отменить поток SomeEvent1 -> SomeEvent2 в случае, если существует третий поток SomeEvent3, который генерирует события того же типа, поэтому я хочу остановить поток, если event1.value === event3.value — в противном случае продолжайте слияние с потоком SomeEvent2. Имеет ли это смысл?

2. Пример кода не прослушивал события типа Event3 , верно? Вы говорите, что после event 1 отправки, прослушивайте event 2 до event 3 тех пор, пока не будет отправлено?

3. Да, правильно! Прослушивайте event 2 , пока event 3 не выдаст.

4. Хорошо, ответ обновлен. Есть шанс , что это может сработать для вас 🙂

5. Потрясающе! Я думаю, что это оно