Группа реакторов с параллелизмом Работает в одном потоке

#spring #spring-webflux #project-reactor #reactor-netty

Вопрос:

Я пытаюсь добиться параллелизма для каждой группы, при котором сгруппированный элемент работает параллельно, а внутри группы каждый элемент работает последовательно. Однако для приведенного ниже кода первый выброс использует параллельный поток, но для последующего выброса он использует какой-то другой пул потоков. Как я могу добиться параллелизма для группы и последовательного выполнения для элемента внутри группы.

 public class ReactorTest implements SmartLifecycle, ApplicationListener<ApplicationReadyEvent> {

    private AtomicInteger counter = new AtomicInteger(1);
    private Many<Integer> healthSink;
    private Disposable dispose;

    private ScheduledExecutorService executor;

    @Override
    public void start() {
        executor = Executors.newSingleThreadScheduledExecutor();
        healthSink = Sinks.many().unicast().onBackpressureBuffer();
        dispose = healthSink.asFlux().groupBy(v -> v % 3 == 0).parallel(10)
                .runOn(Schedulers.newBoundedElastic(10, 100, "k-task")).log().flatMap(v -> v)
                .subscribe(v -> log.info("Data {}", v));
    }

    @Override
    public void stop() {
        executor.shutdownNow();
        if (dispose != null) {
            dispose.dispose();
        }
    }

    @Override
    public boolean isRunning() {
        return executor == null ? false : !executor.isShutdown();
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {

        executor.scheduleAtFixedRate(() -> {
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
        }, 10, 10, TimeUnit.SECONDS);
    }
}
 

бревно

 2021-07-27 14:15:34.189  INFO 22212 --- [  restartedMain] i.g.kprasad99.reactor.DemoApplication    : Started DemoApplication in 1.464 seconds (JVM running for 1.795)
2021-07-27 14:15:44.206  INFO 22212 --- [       k-task-1] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 2
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] io.github.kprasad99.reactor.ReactorTest  : Data 3
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 4
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 5
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 6
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 7
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 8
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 9
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 10
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 11
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 12
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 13
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 14
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 15
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 16
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 17
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 18
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 19
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 20
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 21
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 22
2021-07-27 14:16:54.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 23
 

Комментарии:

1. Я думаю, что одна из причин в том, что вы используете Sinks.many().unicast() , и в документах говорится, что это Help building Sinks.Many that will broadcast signals to a single Subscriber так . Я пытался использовать multicast() , но мне пришлось много переделывать ваш код, так как он не работает из коробки. Однако в целом, если я использую простой Flux (не a Disposable ) код, он выполняется в разных потоках даже без .parallel(10) него .

2. @Фелипе, Но у меня есть право на одного подписчика. Более того, это просто пример, в котором я публикую и подписываюсь в одном классе, но намереваюсь публиковать из разных мест. Поскольку я думал, что у него один подписчик, я использовал одноадресную рассылку.

Ответ №1:

Вам нужно поставить .parallel(..) после .flatMap(..) оператора:

 Flux.interval(Duration.ofMillis(100))
  .take(10)
  .groupBy(v -> v % 2 == 0)
  .flatMap(f -> f)
  .parallel(2)
  .runOn(Schedulers.newBoundedElastic(2, 10, "k-task"))
  .subscribe(i -> log.info("Data {}", i));
 

Результат:

 10:32:33.377 [k-task-1] INFO  Data 0
10:32:33.466 [k-task-2] INFO  Data 1
10:32:33.562 [k-task-1] INFO  Data 2
10:32:33.673 [k-task-2] INFO  Data 3
10:32:33.766 [k-task-1] INFO  Data 4
10:32:33.860 [k-task-2] INFO  Data 5
10:32:33.971 [k-task-1] INFO  Data 6
10:32:34.065 [k-task-2] INFO  Data 7
10:32:34.163 [k-task-1] INFO  Data 8
10:32:34.268 [k-task-2] INFO  Data 9