Добавление операций потока к приемнику

#akka-stream

#akka-stream

Вопрос:

У меня есть источник, который мне нужно преобразовать в другой. По пути я также хочу подключиться к потоку и отправить его в приемник. Это хорошо работает с alsoTo и WireTap.

Однако я хочу выполнить дополнительные преобразования данных до того, как они достигнут приемника:

 Source ---> map ---> scan ---> map ---> return as Source
                           
                             alsoTo
                             
                               ---> map --> Sink
  

В документах я прочитал:

Можно присоединить поток к источнику, в результате чего получится составной источник, и также можно добавить поток к приемнику, чтобы получить новый приемник.

Но я не смог найти пример, чтобы добавить такой поток к приемнику, например, выполнить операцию «map» перед пересылкой в приемник.

ОЧЕНЬ упрощенный пример:

 val sink = Sink.foreach(println)
val source = Source(List(1, 2, 3, 4, 5))
val transformed_source = source.map(n => n * 10).alsoTo(sink).map(n => n   1)

// but I want something along the lines of
alsoTo(map(n => "The result is "   n) ~> sink)
  

Ответ №1:

Я думаю Flow.to это то, что я ищу.

 def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat]
  

Подключите этот поток к приемнику, объединив этапы обработки
оба.