#java #akka #akka-stream
#java #akka #akka-stream
Вопрос:
Я использую AKKA streamRefs (версия 2.5.32) с java API.
Я попытался построить график (используя GraphDSL или классы PartitionHub), в котором я могу передавать некоторые данные циклическим способом и по его завершении транслировать на все серверы сообщение «ГОТОВО», идентифицирующее завершение потока с использованием monitorMat
и watchTermination
не сработало для меня.
Я использовал документацию AKKA для построения графика, в котором я передаю нескольким приемникам некоторые данные следующим образом:
(с помощью watchTermination
)
RunnableGraph<Source<EventAccessCollection, NotUsed>> runnableGraph =
source.toMat(PartitionHub.ofStateful(EventAccessCollection.class, () -> new RoundRobin<EventAccessCollection>(),
workersSinks.size()), Keep.right());
Source<EventAccessCollection, NotUsed> fromProducer = runnableGraph.run(mat);
for (Sink<EventAccessCollection, NotUsed> w:workersSinks) {
fromProducer.runWith(w,mat);
}
fromProducer.watchTermination((
(mat,completionStage)->
completionStage.toCompletableFuture().thenRun(() -> {
broadcastMessage(FINISH);
})
));
Документацию можно найти здесь:
https://doc.akka.io/docs/akka/2.5.32/stream/stream-dynamic.html
Кажется, что, поскольку SinkRef должен быть такого типа <T,NotUsed>
, я не могу использовать его для получения отзывов о потоке (в отличие от CompletionStage<Done>
)
Еще одна мысль, которую я имел в виду, заключалась в том, что, возможно, потоковая передача / отправка сообщений из разных графиков может быть определена последовательно, чтобы они поступали получателям в том порядке, в котором они были отправлены.
Как я могу принудительно упорядочить потоки / сообщения, которые я хочу отправить получателям?
Комментарии:
1. Что вы подразумеваете под «принудительным порядком»? Сообщения внутри графа потоков akka обрабатываются последовательно. Вне графика потока akka вы должны решить, используя любую логику, которую вы хотите.
2. Допустим, вы хотите передать коллекцию целых чисел в несколько приемников, а затем коллекцию строк в одни и те же приемники. Как вы можете это сделать? не могли бы вы привести пример, пожалуйста?
3. Этот пример в официальном документе akka stream ( doc.akka.io/docs/akka/current/stream/operators/Broadcast.html ) выполняет широковещательную передачу на 3 приемника с именами count, min и max.