Поток завершается после добавления потока

#akka #akka-stream #reactive-streams

#akka #akka-stream #реактивные потоки

Вопрос:

У меня есть простой поток, объявленный следующим образом:

 Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(30), Files.list(rootDir).collect(Collectors.toList()))
                .mapConcat(files -> files)
                .log("scanning logs")
                .via(logsFlow.create())
                .via(kafkaFlow.create())
//                .via(archiveFlow.create())
                .runWith(Sink.ignore(), materializer)
                .whenComplete((a, b) -> {
                    log.info("done");
                });
  

С закомментированным archiveFlow все работает так, как ожидалось. Но когда я добавляю дополнительный поток, независимо от того, является ли это потоком архива или каким-то простым потоком, подобным этому:

 .via(Flow.of(Path.class).map(path -> {
                    log.info("foo");
                    return path;
                }))
  

поток завершается после первого тика.
Почему это?

 2019-03-20 21:35:09.292 DEBUG 50089 --- [lt-dispatcher-2] a.kafka.internal.DefaultProducerStage    : Stage completed
2019-03-20 21:35:09.294 DEBUG 50089 --- [lt-dispatcher-4] akka.stream.Materializer                 : [scanning logs] Downstream finished.
2019-03-20 21:35:09.296  INFO 50089 --- [onPool-worker-3] com.example.MyStream  : done
  

Ответ №1:

Оказалось, что Akka проглотила ошибку. Я использовал стратегию супервизии, и теперь все работает нормально.

Комментарии:

1. рекомендуется всегда использовать recover для потока — doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html