Flux параллельный -последовательное выполнение с groupBy

#java #project-reactor

#java #проект-реактор

Вопрос:

Скажем, у меня есть это:

 Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
   .groupBy(i -> i % 3);
  

и скажем, у меня есть метод:

 Mono<Integer> getFromService(Integer i);
  

Я хочу вызывать getFromService параллельно для каждой из групп, но убедитесь, что вызовы являются последовательными в каждой группе.

Для приведенного выше примера это будут три параллельных потока с этими входными значениями:

 stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11
  

Я пробовал это, но это не то, что я хочу:

 Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))
  

Это вызывает службу параллельно для всех целых чисел одновременно. Как мне поступить?

Ответ №1:

Используйте либо concatMap или flatMapSequential вместо внутреннего .flatMap

Если вы хотите последовательное выполнение внутри каждой группы (т. Е. Только Одна подписка getFromService на один раз в каждой группе), затем используйте .concatMap , например:

 Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))
  

Если параллельное выполнение внутри группы в порядке, но вам просто важен порядок, в котором генерируется последовательность, тогда используйте flatMapSequential , например:

 Flux.range(0, 12)
    .groupBy(i -> i % 3)
    .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))
  

Другой вариант — использовать .flatMap с concurrency аргументом, установленным в 1 , но я бы рекомендовал вместо этого один из вышеперечисленных.