Что такое эквивалент Akka streams .via() в Project Reactor?

#java #scala #project-reactor #akka-stream

#java #scala #проект-реактор #akka-stream

Вопрос:

Я как бы борюсь с документацией Project Reactor. У меня есть некоторый опыт работы с Akka Streams, но сейчас я работаю над проектом, который использует Project Reactor.

Мне нужен оператор реактора, который может принимать последовательность для передачи сообщения. Он должен вести себя аналогично оператору .via() в Akka Streams .

Например, скажем, у нас есть последовательность: A -> B -> C, и мне нужно ввести последовательность X1 -> X2 -> X3 после шага B. Таким образом, конечная последовательность будет A -> B -> X1 -> X2 -> X3 -> C.

Существует ли что-то подобное в Reactor?

Ответ №1:

Закрытие via в Project Reactor — это метод преобразования.

Итак, в Akka скажем, у вас есть этот график:

 Source.single(10)
      .map(_ * -1) //some mapping
      .runWith(Sink.ignore)
 

и тогда у вас есть этот поток:

 val flow = Flow[Int].map(_ * 2)
 

вы можете подключить этот поток к своему графику следующим образом:

 Source.single(10)
      .map(_ * -1)
      .via(flow)
      .runWith(Sink.ignore)
 

Эквивалентом в Project Reactor будет это:

Наличие графика:

  Flux.just(10)
     .map(x -> x * -1)
     .subscribe();
 

и метод преобразования a Flux<Integer> в Publisher<Integer> :

 public static class Transformers
{
  public static Publisher<Integer> flow(Flux<Integer> f)
  {
    return f.map(x -> x * 2);
  }
}
 

вы можете подключить этот метод к своему графику следующим образом:

 Flux.just(10)
    .map(x -> x * -1)
    .transform(Transformers::flow)
    .subscribe();
 

Я написал сообщение об этом и других различиях между двумя API, возможно, вы найдете его полезным. Это сообщение от 2019 года, и API развиваются. Например, я упоминаю compose метод в контексте Flux , который был переименован с transformDeferred тех пор, как я написал это, я не уверен, что еще изменилось с тех пор, как я написал сообщение, поэтому имейте в виду: Akka Streams vs Project Reactor API