#java #akka #akka-stream
Вопрос:
Ниже приведен код, который я написал, чтобы попытаться вывести сумму каждого полученного сообщения akka, которое было отредактировано из этого руководства :
https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class AkkaSourceTesting {
public static void main(String args[]){
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty() , "actorSystem");
Source<Integer, ActorRef> matValuePoweredSource =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
// never fail the stream because of a message
elem -> Optional.empty(),
100,
OverflowStrategy.fail());
Pair<ActorRef, Source<Integer, NotUsed>> actorRefSourcePair =
matValuePoweredSource.preMaterialize(actorSystem);
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
actorRefSourcePair.first().tell(1, ActorRef.noSender());
Flow<Integer, Integer, NotUsed> groupedFlow = Flow.of(Integer.class)
.grouped(2)
.map(value -> {
List<Integer> newList = new ArrayList<>(value);
return newList;
})
.mapConcat(value -> value);
// pass source around for materialization
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res element)).runWith(Sink.foreach(System.out::println), actorSystem);
}
}
Операция складывания, похоже, ничего не выводит на консоль.
Однако, если я использую
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).map(x -> x * 2)).runWith(Sink.foreach(System.out::println), actorSystem);
вместо
actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res element)).runWith(Sink.foreach(System.out::println), actorSystem);
Затем выводится следующее :
2
2
Я пытаюсь сгруппировать список и выполнить операцию свертки для каждой из групп, но свертка даже не выполняется. Я пропустил шаг?
Ответ №1:
Flow.fold
не выдает значение до завершения восходящего потока.
Также обратите внимание, что ваш groupedFlow
поток идентификации: его можно удалить, ничего не меняя:
grouped
принимает каждую последующую пару элементов и объединяет их вList
map
Этап преобразует этоList
вArrayList
mapConcat
разворачиваетArrayList
и выдает элементы
Самое четкое выражение того, что вы ищете (поток суммы пар последовательных групп из 2) в Java, вероятно, выглядит следующим образом
actorRefSourcePair.second()
.grouped(2)
.map(twoList -> twoList.stream().reduce(0, Integer::sum))
.runWith(Sink.foreach(System.out::println), actorSystem);