Как излучать из потока в полном объеме

#java #spring-webflux #project-reactor

Вопрос:

Я пытаюсь реализовать что-то похожее на Akka Streams statefulMapConcat… В принципе, у меня есть поток оценок примерно так:

Score(LocalDate date, Integer score)

Я хочу взять их и выбрасывать по одному агрегату в день:

ScoreAggregate(LocalDate date, Integer scoreCount, Integer totalScore)

Итак, у меня есть агрегатор, который сохраняет некоторое внутреннее состояние, которое я настроил перед обработкой, и я хочу создать плоскую карту поверх этого агрегатора, который возвращает Моно. Агрегатор будет выдавать Моно со значением только в том случае, если дата изменится, так что вы получите только одно значение в день.

 ScoreAggregator aggregator = ...

Flux<Score> scoreFlux = ...

scoreFlux.flatMap(aggregator::addScore)
 

Поэтому мой вопрос таков… как я могу выделить последний элемент, когда scoreFlux он завершится? У агрегатора будут некоторые данные за последний день, которые еще не были отправлены, и мне нужно их отправить.

Комментарии:

1. Возможно, я не совсем понимаю, но не могли бы вы просто использовать concatWith() после вызова flatmap и добавить другого издателя, которого вы хотите?

2. Ааа… да. Вот что я делаю: .concatWith(Flux.defer(агрегатор::onComplete)), где метод onComplete() вернет Моно конечного агрегата

3. Да, точно — это должно сработать, так как defer() гарантирует, что оно не будет выполнено раньше времени 🙂

Ответ №1:

Повторяю комментарий в качестве ответа, просто чтобы это не выглядело как без ответа:

Поэтому мой вопрос таков… как я могу выделить последний элемент, когда поток результатов завершится?

Вы можете просто использовать concatWith() для объединения издателя, которого вы хотите, после завершения исходного потока. Если вы хотите, чтобы это оценивалось только после завершения работы исходного издателя , убедитесь, что вы завершили его Mono.defer() , что предотвратит упреждающее выполнение.