#java #reactive-programming #project-reactor #iterable
Вопрос:
Я работаю над школьным проектом с использованием Reactor и сталкиваюсь с некоторыми проблемами Flux
.
В восходящем потоке у меня есть a Flux
, который считывает данные из базы данных и генерирует строки для использования нижестоящим процессором. Например:
public Flux<Row> emitRow(...)
{
return Flux.create(cursor -> {
... logic to read and check db ...
emitRow(cursor, row);
cursor.complete()
});
}
Чтение всего в память не идеально, поскольку наша среда изолированной среды имеет ограниченный объем памяти. Итак, у меня есть настройка: каждый раз, когда мы читаем строку, мы обрабатываем ее перед получением следующей строки.
Изначально у меня есть что-то вроде этой настройки, и, похоже, это работает, однако мой инструктор сказал, что нам нужно использовать toIterable()
вместо этого.
public Mono<Outcome> process(Flux<Row> data)
{
return data
.flatMap(row -> processRow(row), SINGLE_THREAD, SINGLE_THREAD)
.map(results -> new Outcome(results));
}
Итак, прямо сейчас я пытаюсь заставить его работать, toIterable()
но у меня возникают проблемы. Это то, что я имею в виду, но он ведет себя не так, как указано в JavaDoc. Похоже, что при этом задействованы 2 очереди. Первый — это очередь блокировки итератора, а второй — из оригинала Flux.create
. И вторая очередь (из Flux.create
), похоже, буферизует все в память, вызывая исключение нехватки памяти в моем приложении.
public Mono<Outcome> process(Flux<Row> data)
{
Iterator<Row> itr = data.toIterable(1).iterator();
return Mono.fromCallable(() -> processRow(itr))
.map(results -> new Outcome(results));
}
public Results processRow(Iterator<Row> itr)
{
while(itr.hasNext()) <--- This is suppose to be blocking
{
Row r = itr.next();
dbContentBuilder.handle(r);
}
return new Results(dbContentBuilder.build());
}
Любая идея, почему или как заставить его работать с iterable, чтобы Flux.create не пытался буферизировать все в синхронизацию, а вместо этого iterator.next() запросит следующий фрагмент.
Документация, похоже, предполагает, что это ленивая очередь и будет блокироваться при запросе .next()
. Документ: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#toIterable-int —
Комментарии:
1. Я удалил ваш побочный вопрос. Вопросы по SO должны быть сосредоточены на одном.
2. Отредактируйте свой ответ, включая реализацию
processRow(itr)
.3. Добавлена соответствующая реализация processRow
4. Вы абсолютно уверены, что правильно поняли своего инструктора? Использование iterable в этом случае не имеет смысла, особенно если вы пытаетесь вернуть издателя (чего вам определенно не следует делать, когда он пытается обернуть блокирующий код.)
5. Да, это то, что спросил инструктор. Сегодня я снова проверил его.