фолд не работает должным образом с использованием потоков akka

#java #akka #akka-stream

Вопрос:

Ниже приведен код, который я написал, чтобы попытаться вывести сумму каждого полученного сообщения akka, которое было отредактировано из этого руководства :

https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html

 import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class AkkaSourceTesting {

    public static void main(String args[]){

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty() , "actorSystem");

        Source<Integer, ActorRef> matValuePoweredSource =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        100,
                        OverflowStrategy.fail());

        Pair<ActorRef, Source<Integer, NotUsed>> actorRefSourcePair =
                matValuePoweredSource.preMaterialize(actorSystem);

        actorRefSourcePair.first().tell(1, ActorRef.noSender());
        actorRefSourcePair.first().tell(1, ActorRef.noSender());
        actorRefSourcePair.first().tell(1, ActorRef.noSender());

        Flow<Integer, Integer, NotUsed> groupedFlow = Flow.of(Integer.class)
                .grouped(2)
                .map(value -> {
                    List<Integer> newList = new ArrayList<>(value);
                    return newList;
                })
                .mapConcat(value -> value);

        // pass source around for materialization
        actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res   element)).runWith(Sink.foreach(System.out::println), actorSystem);

    }
}
 

Операция складывания, похоже, ничего не выводит на консоль.

Однако, если я использую

 actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).map(x -> x * 2)).runWith(Sink.foreach(System.out::println), actorSystem);
 

вместо

 actorRefSourcePair.second().via(Flow.of(Integer.class).via(groupedFlow).fold(0, (res, element) -> res   element)).runWith(Sink.foreach(System.out::println), actorSystem);
    
 

Затем выводится следующее :

 2
2
 

Я пытаюсь сгруппировать список и выполнить операцию свертки для каждой из групп, но свертка даже не выполняется. Я пропустил шаг?

Ответ №1:

Flow.fold не выдает значение до завершения восходящего потока.

Также обратите внимание, что ваш groupedFlow поток идентификации: его можно удалить, ничего не меняя:

  • grouped принимает каждую последующую пару элементов и объединяет их в List
  • map Этап преобразует это List в ArrayList
  • mapConcat разворачивает ArrayList и выдает элементы

Самое четкое выражение того, что вы ищете (поток суммы пар последовательных групп из 2) в Java, вероятно, выглядит следующим образом

 actorRefSourcePair.second()
    .grouped(2)
    .map(twoList -> twoList.stream().reduce(0, Integer::sum))
    .runWith(Sink.foreach(System.out::println), actorSystem);