Непрерывное уменьшение состояния с помощью Flux

#java #project-reactor

#java #проект-реактор

Вопрос:

Допустим, у меня есть два типа событий ( A и B ) и Flux программы, которые их каким-то образом генерируют:

 Flux<A> aFlux = ...;
Flux<B> bFlux = ...;
  

а также тип, который содержит текущее состояние, обозначаемое типом S :

 class S {
  final int val;
}
  

Я хочу создать следующее:

 final S sInitial = ...;

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
  .scan((a, e) -> {
    if(e instanceof A) {
      return mapA(a, (A)e);
    } else if(e instanceof B) {
      return mapB(a, (B)e);
    } else {
      throw new RuntimeException("invalid event");
    }
  })
  .startWith(sInitial);
  

где sCurr — экземпляр S , который был последним выведен sFlux, начиная с sInitial и mapA / mapB возвращает новое значение типа S . Оба S и sInitial являются неизменяемыми.

То есть я хочу:

  • Непрерывно выводите последнее состояние…
  • … это генерируется …
  • … на основе текущего состояния и полученного события…
  • … как предписано функциями mapper

Есть ли способ реорганизовать вышеупомянутый поток потока каким-либо другим способом, особенно для того, чтобы избежать использования instanceof ?

Ответ №1:

Вы могли бы добавить интерфейс и реализовать его для ваших классов A и B.

 interface ToSConvertible {
    S toS(S s);
}
  

Теперь вы могли бы использовать reactor.core.publisher.Flux#scan(A, java.util.function.BiFunction<A,? super T,A>) метод:

 Flux<S> sFlux = Flux.merge(aFlux, bFlux)
        .scan(sInitial, (s, e) -> e.toS(s));