#java #multithreading #kotlin #thread-safety #project-reactor
Вопрос:
Я создаю конвейер обработки, который использует события, применяет преобразование с сохранением состояния и сохраняет результат в базе данных.
Чтобы быть более конкретным, потребитель считывает серию событий, затем мне нужно удерживать их (в памяти) до тех пор, пока не наступят определенные события (так называемые «конец пакета»), а затем преобразовать их. Преобразование выполняется с учетом состояния — логика зависит от порядка событий, полученных ранее.
Это можно описать следующим псевдокодом Котлина:
val consumer: QueueConsumer<Event> = Consumer()
consumer.read()
.concatMap { events: List<Event> -> Monojust(transformer.transform(events)}
.concatMap {db.saveAll(it}
Мой вопрос в том, как подойти к потоковой передаче transformer
. Моя идея состоит в том, чтобы сделать его однопоточным, чтобы избежать синхронизации.
Потребитель-это один поток, занятый включением и чтением файловой системы, поэтому transformer.transform(events)
он всегда выполняется в потоке потребителя.
На самом деле, я считал эту конструкцию не очень надежной, поскольку она зависит от потоковой передачи вышестоящего оператора и работает только в предположении, что она однопоточная.
Есть ли лучший способ сделать это?
Комментарии:
1. Рассматривали ли вы возможность использования метода bufferUntil потока? Это позаботится об управлении государством, так что вам не придется этого делать.