Как динамически маршрутизировать потоки RxJS

#rxjs #rxjs5

#rxjs #rxjs5

Вопрос:

У меня есть поток данных с автомобильных аукционов. Каждый автомобильный аукцион имеет n-количество полос. Я хочу зарегистрировать аукцион каждого транспортного средства.

Поток выглядит примерно так…

—{полоса: 1, действие: ставка} — { полоса: 2, действие: старт} — { полоса: 1, действие: ставка} — {полоса: 2, действие: ставка} — {полоса: 1, действие: продано} —

У меня есть следующее, чтобы буферизировать каждую полосу аукциона и закрыть буфер при продаже…

 const bufferOpen$= auctionWebSocketStream$
    .filter(stream => stream.tag === 'CURITEM');

const bufferClose$ = () => auctionWebSocketStream$.filter(stream => stream.tag === 'SOLD');

auctionWebSocketStream$
  .bufferToggle(bufferOpen$, bufferClose$)
  .subscribe(x => console.log(x));
  

Вышесказанное работает нормально, пока есть один аукцион и одна полоса. С несколькими дорожками есть информация о ставках / продажах о нескольких дорожках.

Как мне объединить поток по дорожкам в буфер? Подобные решения всегда имели известные параметры агрегации. Но мне нужно разделять поток в любое время, когда появляется новая полоса.

Помощь приветствуется.

Обновить

Я создал JSBin, чтобы показать свое разочарование и невежество. Он дает пример входного потока и объясняет желаемый результат.

http://jsbin.com/tuxitev/edit?js ,консоль

(Для получения бонусных очков он показывает только пустые массивы в Babel. Не уверен, зачем требуется Typescript)

Комментарии:

1. Просто наткнулся на groupBy… Я думаю, что у меня сейчас будет момент «извините, что потратил время всех».

2. У вас есть фиксированное количество дорожек? Может ли земля повторно открыться для нового аукциона?

3. Это не исправлено и может быть открыто повторно. Например. Аукцион 1, полосы A, B, C — Аукцион 2, полосы A, B — Аукцион 1, полоса D — Аукцион 1, полоса A завершена — Аукцион 3, полоса A

4. Итак, когда он снова откроется, вы снова подписываетесь?

5. Да, потребуется повторная подписка. Это немного сложный случай, и если это вызывает много проблем, я могу использовать версию без повторной подписки.

Ответ №1:

Если кто-то знает, где получить ответы на вопросы RxJS, пожалуйста, дайте мне знать. Я приму ответ. Это третий оставшийся без ответа вопрос RxJS, который у меня был.

Для тех, кто интересуется ответом, вот он.

 stream$
.groupBy(stream => stream.lane)
.mergeMap(stream =>
        stream.scan((acc, cur) => {
                if (cur.action === 'start') {
                    acc = [];
                }
                acc.push(cur)
                return acc;
        }, [])
        .filter(stream => stream[stream.length-1].action === 'sold')
)
.subscribe(
    x => console.log(x), 
    (e) => console.error(e), 
    () => console.log('complete')
)
  

http://jsbin.com/tuxitev/edit?js ,консоль