#java #scala #project-reactor #akka-stream
#java #scala #проект-реактор #akka-stream
Вопрос:
Я как бы борюсь с документацией Project Reactor. У меня есть некоторый опыт работы с Akka Streams, но сейчас я работаю над проектом, который использует Project Reactor.
Мне нужен оператор реактора, который может принимать последовательность для передачи сообщения. Он должен вести себя аналогично оператору .via() в Akka Streams .
Например, скажем, у нас есть последовательность: A -> B -> C, и мне нужно ввести последовательность X1 -> X2 -> X3 после шага B. Таким образом, конечная последовательность будет A -> B -> X1 -> X2 -> X3 -> C.
Существует ли что-то подобное в Reactor?
Ответ №1:
Закрытие via
в Project Reactor — это метод преобразования.
Итак, в Akka скажем, у вас есть этот график:
Source.single(10)
.map(_ * -1) //some mapping
.runWith(Sink.ignore)
и тогда у вас есть этот поток:
val flow = Flow[Int].map(_ * 2)
вы можете подключить этот поток к своему графику следующим образом:
Source.single(10)
.map(_ * -1)
.via(flow)
.runWith(Sink.ignore)
Эквивалентом в Project Reactor будет это:
Наличие графика:
Flux.just(10)
.map(x -> x * -1)
.subscribe();
и метод преобразования a Flux<Integer>
в Publisher<Integer>
:
public static class Transformers
{
public static Publisher<Integer> flow(Flux<Integer> f)
{
return f.map(x -> x * 2);
}
}
вы можете подключить этот метод к своему графику следующим образом:
Flux.just(10)
.map(x -> x * -1)
.transform(Transformers::flow)
.subscribe();
Я написал сообщение об этом и других различиях между двумя API, возможно, вы найдете его полезным. Это сообщение от 2019 года, и API развиваются. Например, я упоминаю compose
метод в контексте Flux
, который был переименован с transformDeferred
тех пор, как я написал это, я не уверен, что еще изменилось с тех пор, как я написал сообщение, поэтому имейте в виду: Akka Streams vs Project Reactor API