#project-reactor
Вопрос:
У меня есть вариант использования, когда я хочу создать группу GroupedFlux с помощью ключа разделения и в каждой группе задержать элементы на 100 миллисекунд. Однако я хочу, чтобы несколько групп начинались одновременно. Поэтому, если есть 3 группы, я ожидаю, что каждые 100 миллисекунд будет выдаваться 3 сообщения. Однако со следующим кодом я вижу только 1 сообщение каждые 100 миллисекунд.
Это код, который я ожидал, что он будет работать.
final Fluxlt;GroupedFluxlt;String, TDatagt;gt; groupedFlux = flux.groupBy(Event::getPartitionKey); groupedFlux.subscribe(g -gt; g.delayElements(Duration.ofMillis(100)) .flatMap(this::doWork) .doOnError(throwable -gt; log.error("error: ", throwable)) .onErrorResume(e -gt; Mono.empty()) .subscribe());
Это журнал.
21:24:29.318 parallel-5] : GroupByKey : 2 21:24:29.424 parallel-6] : GroupByKey : 3 21:24:29.529 parallel-7] : GroupByKey : 1 21:24:29.634 parallel-8] : GroupByKey : 2 21:24:29.739 parallel-9] : GroupByKey : 3 21:24:29.844 parallel-10] : GroupByKey : 1 21:24:29.953 parallel-11] : GroupByKey : 2 21:24:30.059 parallel-12] : GroupByKey : 3 21:24:30.167 parallel-1] : GroupByKey : 1
(Смотрите разницу почти в 100 мс между каждым оператором журнала. столбец 1s-это метка времени.
Ответ №1:
После более детального анализа я обнаружил, что он работает нормально. В моих тестах были неверные данные для ключа раздела, что привело к одному групповому потоку.
Отвечая на мой собственный вопрос на случай, если кто-то когда-нибудь усомнится в том, что элементы задержки работают по-другому в groupedFlux. Это не так.