Реактор — однопоточная передача состояния

#java #multithreading #kotlin #thread-safety #project-reactor

Вопрос:

Я создаю конвейер обработки, который использует события, применяет преобразование с сохранением состояния и сохраняет результат в базе данных.

Чтобы быть более конкретным, потребитель считывает серию событий, затем мне нужно удерживать их (в памяти) до тех пор, пока не наступят определенные события (так называемые «конец пакета»), а затем преобразовать их. Преобразование выполняется с учетом состояния — логика зависит от порядка событий, полученных ранее.

Это можно описать следующим псевдокодом Котлина:

 val consumer: QueueConsumer<Event> = Consumer()
consumer.read()
   .concatMap { events: List<Event> -> Monojust(transformer.transform(events)}
   .concatMap {db.saveAll(it}

 

Мой вопрос в том, как подойти к потоковой передаче transformer . Моя идея состоит в том, чтобы сделать его однопоточным, чтобы избежать синхронизации.

Потребитель-это один поток, занятый включением и чтением файловой системы, поэтому transformer.transform(events) он всегда выполняется в потоке потребителя.

На самом деле, я считал эту конструкцию не очень надежной, поскольку она зависит от потоковой передачи вышестоящего оператора и работает только в предположении, что она однопоточная.

Есть ли лучший способ сделать это?

Комментарии:

1. Рассматривали ли вы возможность использования метода bufferUntil потока? Это позаботится об управлении государством, так что вам не придется этого делать.