Как использовать flatMapConcat?

#scala #akka-stream

#scala #akka-поток

Вопрос:

Я пытаюсь использовать flatMapConcat как следующее:

 Source.empty
      .flatMapConcat {
        Source.fromFuture(Future("hello"))
      }
      .runWith(Sink.foreach(println))
      .onComplete {
        case Success(_) =>
          println()
        case Failure(e) =>
          println(s"Thrown ${e.getMessage}")
      }
  

и компилятор жалуется:

 Error:(31, 26) type mismatch;
 found   : akka.stream.scaladsl.Source[String,akka.NotUsed]
 required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
        Source.fromFuture(Future("hello")) 
  

Что я делаю не так?

Ответ №1:

Метод akka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]» rel=»nofollow noreferrer»>flatMapConcat имеет следующую подпись:

 def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]
  

который, в случае обработки Source из String s, ожидал бы функцию типа:

 f: String => Source(Iterable[String])
  

Другая проблема с вашим примером кода заключается в том, что Source.empty[T] в нем нет элементов для обработки, следовательно, последующие flatMapConcat никогда не будут выполнены.

Вот пример использования flatMapConcat для преобразования каждого элемента из Source имен:

 import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

Source(List("alice", "bob", "jenn")).
  flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
  runWith(Sink.foreach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi jenn
// Bye jenn
  

В качестве примечания можно было бы заменить flatMapConcat в приведенном выше примере на scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]» rel=»nofollow noreferrer»>mapConcat, который ожидает более простой подписи функции:

 Source(List("alice", "bob", "jenn")).
  mapConcat{ name => List(s"Hi $name", s"Bye $name") }.
  runWith(Sink.foreach(println))