Элементы задержки реактора внутри ГруппыфЛюкс задерживает элементы во всех группах

#project-reactor

Вопрос:

У меня есть вариант использования, когда я хочу создать группу GroupedFlux с помощью ключа разделения и в каждой группе задержать элементы на 100 миллисекунд. Однако я хочу, чтобы несколько групп начинались одновременно. Поэтому, если есть 3 группы, я ожидаю, что каждые 100 миллисекунд будет выдаваться 3 сообщения. Однако со следующим кодом я вижу только 1 сообщение каждые 100 миллисекунд.

Это код, который я ожидал, что он будет работать.

 final Fluxlt;GroupedFluxlt;String, TDatagt;gt; groupedFlux =  flux.groupBy(Event::getPartitionKey); groupedFlux.subscribe(g -gt; g.delayElements(Duration.ofMillis(100))  .flatMap(this::doWork)  .doOnError(throwable -gt; log.error("error: ", throwable))  .onErrorResume(e -gt; Mono.empty())  .subscribe());  

Это журнал.

 21:24:29.318 parallel-5] : GroupByKey : 2 21:24:29.424 parallel-6] : GroupByKey : 3 21:24:29.529 parallel-7] : GroupByKey : 1 21:24:29.634 parallel-8] : GroupByKey : 2 21:24:29.739 parallel-9] : GroupByKey : 3 21:24:29.844 parallel-10] : GroupByKey : 1 21:24:29.953 parallel-11] : GroupByKey : 2 21:24:30.059 parallel-12] : GroupByKey : 3 21:24:30.167 parallel-1] : GroupByKey : 1  

(Смотрите разницу почти в 100 мс между каждым оператором журнала. столбец 1s-это метка времени.

Ответ №1:

После более детального анализа я обнаружил, что он работает нормально. В моих тестах были неверные данные для ключа раздела, что привело к одному групповому потоку.

Отвечая на мой собственный вопрос на случай, если кто-то когда-нибудь усомнится в том, что элементы задержки работают по-другому в groupedFlux. Это не так.