Поток потока после функции .map() не в порядке

#java #spring-webflux #reactor

Вопрос:

пытаясь ознакомиться с потоком, я хочу, чтобы в потоке был поток данных.

.map() функция должна имитировать рабочую нагрузку, которая требует некоторого случайного времени обработки для каждого Magician объекта и выполняется в отдельных потоках (как вы можете видеть на выходных Thread id данных, чтобы использовать многопоточность.

Наконец, поток должен возвращать результаты по порядку. Но независимо от того, что я делаю, например , использую .sequential() , или где это всегда заканчивается неупорядоченностью.

Может ли кто-нибудь сказать мне, как я могу использовать многопоточность (используя .map() или любую другую последовательность, если задан результат) и привести результаты в порядок, пожалуйста? Спасибо

P.S. Бизнес-логика, лежащая в основе этого вопроса, заключается в том, что я вызываю конечную точку, которая обеспечивает Fluxlt;byte[]gt;

Эти байты мне нужно обработать параллельно (расшифровка) и для этого переслать другому потребителю.

 public class FluxRunner {   public void getWorkerStream() throws InterruptedException {  final int[] counter = new int[1];   // create workers  final Fluxlt;Magician[]gt; workersFlux = Flux.range(0, 10)  .map(integer -gt; {  final Magician[] worker = new Magician[1];  worker[0] = new Magician(counter[0], counter[0]  );  return worker;  });   final Disposable disposable = workersFlux  .parallel()  .runOn(Schedulers.parallel())  .map(workers -gt; {  System.out.println("Thread id: "   Thread.currentThread().getId());  workers[0].calculate();  return workers;  })  .sequential() // no effect, doOnNext is unordered  .doOnNext(workers -gt; System.out.println(workers[0].getId()))  .subscribe();   while (!disposable.isDisposed()) {  sleep(500);  }  } }  @Data class Magician {   final int id;  int number;   Magician(final int id, final int number) {  this.id = id;  this.number = number;  }   public void calculate() {  int timeToSleep = (int) (Math.random() * 3000);  System.out.println("Sleep for "   timeToSleep   " seconds");  try {  sleep(timeToSleep);  } catch (InterruptedException e) {  e.printStackTrace();  }  number  ;  } }  

результатом будет

 Thread id: 33 Thread id: 24 Thread id: 25 Thread id: 31 Thread id: 29 Thread id: 30 Thread id: 27 Thread id: 32 Thread id: 28 Thread id: 26 Sleep for 2861 seconds Sleep for 2811 seconds Sleep for 711 seconds Sleep for 2462 seconds Sleep for 1858 seconds Sleep for 601 seconds Sleep for 126 seconds Sleep for 359 seconds Sleep for 2014 seconds Sleep for 2356 seconds 4 5 7 9 8 3 0 1 6 2  

Комментарии:

1. Если вы не используете что-то похожее на sorted() оператор, я не знаю способа упорядочить поток, который был разделен parallel() . Если какой-то способ действительно существует, вы понимаете, что потенциально отказываетесь от достижений, которых вы достигли, выполняя свои требования, верно? В основном все выполненные задачи должны ждать, пока 1-я задача будет выполнена, затем 2-я задача и т. Д…

Ответ №1:

Вы можете использовать flatMapSequential оператора вместе с .subscribeOn(Schedulers.parallel()) , чтобы получить желаемый результат:

 final Disposable disposable = workersFlux  .flatMapSequential(workers -gt; Mono.fromCallable(() -gt; {  System.out.println("Thread id: "   Thread.currentThread().getId());  workers[0].calculate();  return workers;  }).subscribeOn(Schedulers.parallel()))  .doOnNext(workers -gt; System.out.println(workers[0].getId()))  .subscribe();