#rxjs #reactive #redux-observable
#rxjs #реактивный #redux-наблюдаемый
Вопрос:
Мой вариант использования следующий — у меня есть поток операций для разных элементов, и я хочу вызывать «commit» только для каждого объекта, если они простаивали в течение определенного периода времени или получен другой элемент.
Я пытался использовать groupBy и debounce, но не охватил все случаи — например
action.pipe(
groupBy(item -> item.key),
debounceTime(1000),
mergeMap(item -> {
item.commit()})
)
Комментарии:
1. Что вы подразумеваете под «бездействием»? Как проверить, был ли объект простаивающим?
2. idle как в «не получил событие за X миллис»
3. Я не уверен, правильно ли я вас понимаю. Итак, вы хотите вызвать
commit
элемент, отправленныйaction
, если никакой другой элемент с тем же ключом не отправлен в пределах X миллис или если отправлен другой элемент с другим ключом?!4. я хочу вызвать фиксацию для последнего принятого элемента, если либо элемент с другим ключом поступает, ЛИБО событие не поступает в течение заданного времени. A => A => B => A должно привести к: A.фиксация() => B.фиксация() => <тайм-аут> => A.фиксация()
Ответ №1:
Я не уверен, какова ваша цель:
Давайте возьмем пример ситуации, когда у вас A => B => A время ожидания меньше минимального
Вариант 1: каждый тип элемента должен иметь свое собственное состояние ожидания — второе излучение типа A будет проигнорировано
Вариант 2. поскольку нет последовательной последовательности, вторая A
не будет проигнорирована
ВАРИАНТ 1 пример:
action.pipe(
groupBy(item => item.key),
mergeMap(group => group.pipe(debounceTime(1000))),
mergeMap(item => item.commit())
)
Необязательно:
const IDLE_TIME = XXXX;
action.pipe(
groupBy(item => item.key),
mergeMap(group => merge(
group.pipe(first()),
group.pipe(
timeInterval(),
filter(x => x.interval > IDLE_TIME),
map(x => x.value)
)
)),
mergeMap(item => item.commit())
)
ПРИМЕР ВАРИАНТА 2:
action.pipe(
pairwise(),
debounce(([previous, current]) => previous.key == current.key? timer(1000) : EMPTY),
map(([pre, current]) => current),
mergeMap(item => item.commit())
)
Ответ №2:
Вы можете оценить природу простоя, используя время проверки, сканирование и фильтр
action.pipe(
//add the idle property to the item
map(item => ({ ...item, idle: false})),
//audit the stream each second
auditTime(1000),
//then use scan to with previous emission at audit time
scan(
(prev, curr) => {
//then if the key remains the same then we have an idle state
if (prev.key === curr.key) {
//return changed object to indicate we have an idle state
return Object.assign({}, curr, {idle: true});
} else {
//otherwise just return the non idle item
return curr
}
//seed with an object that cannot match the first emission key
}, { key: null }
),
//then filter out all emissions indicated as not idle
filter(item => item.idle === true)
//and commit
mergeMap(item => item.commit())
)
Затем вы можете использовать distinctUntilKeyChanged для достижения второго условия
action.pipe(
distinctUntilKeyChanged('key'),
mergeMap(item => item.commit())
)
Я не знаком с, redux-observable
но вы обычно merge
используете эти две наблюдаемые величины, а затем фиксируете в конце.
Комментарии:
1. Я не уверен, что это разумно использовать
auditTime
здесь2. ОК. Вероятно, лучше всего подходит для любых читателей, если вы укажете причину?
3. также подумайте о случае, когда у вас есть 3 элемента A -> B -> A, ваш код знает только о предыдущем.
4. Также не с
distinctUntilKeyChanged
выбросами.5. как кто
distinctUntilKeyChanged
мог бы помочь вам в вышеупомянутом случае?