#java #project-reactor
#java #проект-реактор
Вопрос:
Скажем, у меня есть это:
Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
.groupBy(i -> i % 3);
и скажем, у меня есть метод:
Mono<Integer> getFromService(Integer i);
Я хочу вызывать getFromService
параллельно для каждой из групп, но убедитесь, что вызовы являются последовательными в каждой группе.
Для приведенного выше примера это будут три параллельных потока с этими входными значениями:
stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11
Я пробовал это, но это не то, что я хочу:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))
Это вызывает службу параллельно для всех целых чисел одновременно. Как мне поступить?
Ответ №1:
Используйте либо concatMap
или flatMapSequential
вместо внутреннего .flatMap
Если вы хотите последовательное выполнение внутри каждой группы (т. Е. Только Одна подписка getFromService
на один раз в каждой группе), затем используйте .concatMap
, например:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))
Если параллельное выполнение внутри группы в порядке, но вам просто важен порядок, в котором генерируется последовательность, тогда используйте flatMapSequential
, например:
Flux.range(0, 12)
.groupBy(i -> i % 3)
.flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))
Другой вариант — использовать .flatMap
с concurrency
аргументом, установленным в 1
, но я бы рекомендовал вместо этого один из вышеперечисленных.