#scala #akka-stream
#scala #akka-поток
Вопрос:
Я пытаюсь использовать flatMapConcat
как следующее:
Source.empty
.flatMapConcat {
Source.fromFuture(Future("hello"))
}
.runWith(Sink.foreach(println))
.onComplete {
case Success(_) =>
println()
case Failure(e) =>
println(s"Thrown ${e.getMessage}")
}
и компилятор жалуется:
Error:(31, 26) type mismatch;
found : akka.stream.scaladsl.Source[String,akka.NotUsed]
required: ? => akka.stream.Graph[akka.stream.SourceShape[?],?]
Source.fromFuture(Future("hello"))
Что я делаю не так?
Ответ №1:
Метод akka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]» rel=»nofollow noreferrer»>flatMapConcat имеет следующую подпись:
def flatMapConcat[T, M](f: (Out) => Graph[SourceShape[T], M]): Repr[T]
который, в случае обработки Source
из String
s, ожидал бы функцию типа:
f: String => Source(Iterable[String])
Другая проблема с вашим примером кода заключается в том, что Source.empty[T]
в нем нет элементов для обработки, следовательно, последующие flatMapConcat
никогда не будут выполнены.
Вот пример использования flatMapConcat
для преобразования каждого элемента из Source
имен:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
Source(List("alice", "bob", "jenn")).
flatMapConcat{ name => Source(List(s"Hi $name", s"Bye $name")) }.
runWith(Sink.foreach(println))
// Hi alice
// Bye alice
// Hi bob
// Bye bob
// Hi jenn
// Bye jenn
В качестве примечания можно было бы заменить flatMapConcat
в приведенном выше примере на scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]» rel=»nofollow noreferrer»>mapConcat, который ожидает более простой подписи функции:
Source(List("alice", "bob", "jenn")).
mapConcat{ name => List(s"Hi $name", s"Bye $name") }.
runWith(Sink.foreach(println))