RxJS, какой оператор использовать для «OnIdle»?

#rxjs #reactive #redux-observable

#rxjs #реактивный #redux-наблюдаемый

Вопрос:

Мой вариант использования следующий — у меня есть поток операций для разных элементов, и я хочу вызывать «commit» только для каждого объекта, если они простаивали в течение определенного периода времени или получен другой элемент.

Я пытался использовать groupBy и debounce, но не охватил все случаи — например

 action.pipe(
  groupBy(item -> item.key),
  debounceTime(1000),
  mergeMap(item -> { 
           item.commit()})
)
  

Комментарии:

1. Что вы подразумеваете под «бездействием»? Как проверить, был ли объект простаивающим?

2. idle как в «не получил событие за X миллис»

3. Я не уверен, правильно ли я вас понимаю. Итак, вы хотите вызвать commit элемент, отправленный action , если никакой другой элемент с тем же ключом не отправлен в пределах X миллис или если отправлен другой элемент с другим ключом?!

4. я хочу вызвать фиксацию для последнего принятого элемента, если либо элемент с другим ключом поступает, ЛИБО событие не поступает в течение заданного времени. A => A => B => A должно привести к: A.фиксация() => B.фиксация() => <тайм-аут> => A.фиксация()

Ответ №1:

Я не уверен, какова ваша цель:

Давайте возьмем пример ситуации, когда у вас A => B => A время ожидания меньше минимального

Вариант 1: каждый тип элемента должен иметь свое собственное состояние ожидания — второе излучение типа A будет проигнорировано
Вариант 2. поскольку нет последовательной последовательности, вторая A не будет проигнорирована

ВАРИАНТ 1 пример:

 action.pipe(
    groupBy(item => item.key),
    mergeMap(group => group.pipe(debounceTime(1000))),
    mergeMap(item => item.commit())
)
  

Необязательно:

 const IDLE_TIME = XXXX;
action.pipe(
    groupBy(item => item.key),
    mergeMap(group => merge(
        group.pipe(first()),
        group.pipe(
            timeInterval(),
            filter(x => x.interval > IDLE_TIME),
            map(x => x.value)
        )
    )),
    mergeMap(item => item.commit())
)
  

ПРИМЕР ВАРИАНТА 2:

  action.pipe(
     pairwise(),
     debounce(([previous, current]) => previous.key == current.key? timer(1000) : EMPTY),
     map(([pre, current]) => current),
     mergeMap(item => item.commit())
 )
  

Ответ №2:

Вы можете оценить природу простоя, используя время проверки, сканирование и фильтр

 action.pipe(
  //add the idle property to the item
  map(item => ({ ...item, idle: false})),
  //audit the stream each second
  auditTime(1000),
  //then use scan to with previous emission at audit time
  scan(
     (prev, curr) => {
       //then if the key remains the same then we have an idle state
       if (prev.key === curr.key) {
          //return changed object to indicate we have an idle state
          return Object.assign({}, curr, {idle: true});
       } else {
          //otherwise just return the non idle item
          return curr 
       }
    //seed with an object that cannot match the first emission key
    }, { key: null }
  ),
  //then filter out all emissions indicated as not idle
  filter(item => item.idle === true)
  //and commit
  mergeMap(item => item.commit())
)
  

Затем вы можете использовать distinctUntilKeyChanged для достижения второго условия

 action.pipe(
  distinctUntilKeyChanged('key'),
  mergeMap(item => item.commit())
)
  

Я не знаком с, redux-observable но вы обычно merge используете эти две наблюдаемые величины, а затем фиксируете в конце.

Комментарии:

1. Я не уверен, что это разумно использовать auditTime здесь

2. ОК. Вероятно, лучше всего подходит для любых читателей, если вы укажете причину?

3. также подумайте о случае, когда у вас есть 3 элемента A -> B -> A, ваш код знает только о предыдущем.

4. Также не с distinctUntilKeyChanged выбросами.

5. как кто distinctUntilKeyChanged мог бы помочь вам в вышеупомянутом случае?