#rxjs #rxjs5
#rxjs #rxjs5
Вопрос:
У меня есть поток данных с автомобильных аукционов. Каждый автомобильный аукцион имеет n-количество полос. Я хочу зарегистрировать аукцион каждого транспортного средства.
Поток выглядит примерно так…
—{полоса: 1, действие: ставка} — { полоса: 2, действие: старт} — { полоса: 1, действие: ставка} — {полоса: 2, действие: ставка} — {полоса: 1, действие: продано} —
У меня есть следующее, чтобы буферизировать каждую полосу аукциона и закрыть буфер при продаже…
const bufferOpen$= auctionWebSocketStream$
.filter(stream => stream.tag === 'CURITEM');
const bufferClose$ = () => auctionWebSocketStream$.filter(stream => stream.tag === 'SOLD');
auctionWebSocketStream$
.bufferToggle(bufferOpen$, bufferClose$)
.subscribe(x => console.log(x));
Вышесказанное работает нормально, пока есть один аукцион и одна полоса. С несколькими дорожками есть информация о ставках / продажах о нескольких дорожках.
Как мне объединить поток по дорожкам в буфер? Подобные решения всегда имели известные параметры агрегации. Но мне нужно разделять поток в любое время, когда появляется новая полоса.
Помощь приветствуется.
Обновить
Я создал JSBin, чтобы показать свое разочарование и невежество. Он дает пример входного потока и объясняет желаемый результат.
http://jsbin.com/tuxitev/edit?js ,консоль
(Для получения бонусных очков он показывает только пустые массивы в Babel. Не уверен, зачем требуется Typescript)
Комментарии:
1. Просто наткнулся на groupBy… Я думаю, что у меня сейчас будет момент «извините, что потратил время всех».
2. У вас есть фиксированное количество дорожек? Может ли земля повторно открыться для нового аукциона?
3. Это не исправлено и может быть открыто повторно. Например. Аукцион 1, полосы A, B, C — Аукцион 2, полосы A, B — Аукцион 1, полоса D — Аукцион 1, полоса A завершена — Аукцион 3, полоса A
4. Итак, когда он снова откроется, вы снова подписываетесь?
5. Да, потребуется повторная подписка. Это немного сложный случай, и если это вызывает много проблем, я могу использовать версию без повторной подписки.
Ответ №1:
Если кто-то знает, где получить ответы на вопросы RxJS, пожалуйста, дайте мне знать. Я приму ответ. Это третий оставшийся без ответа вопрос RxJS, который у меня был.
Для тех, кто интересуется ответом, вот он.
stream$
.groupBy(stream => stream.lane)
.mergeMap(stream =>
stream.scan((acc, cur) => {
if (cur.action === 'start') {
acc = [];
}
acc.push(cur)
return acc;
}, [])
.filter(stream => stream[stream.length-1].action === 'sold')
)
.subscribe(
x => console.log(x),
(e) => console.error(e),
() => console.log('complete')
)