#java #spring-webflux #project-reactor
#java #spring-webflux #проект-реактор
Вопрос:
Я пытаюсь реализовать поток реактора, созданный из BlockingQueue, но не уверен, какой оператор лучше всего подходит для моего варианта использования?
Я создаю потоковую конечную точку REST, где ответом является поток, который должен продолжать отправлять сообщения из BlockingQueue в качестве ответа на вызов GET REST.
Я уже пробовал форумы и документацию и могу найти только поток, инициированный из повторяющихся коллекций или источников реактивных данных, но никаких примеров из любого BlockingQueue.
Комментарии:
1. Добавьте еще какое-нибудь описание или фрагмент вашего кода, чтобы другие могли понять вашу проблему и помочь вам.
Ответ №1:
Вы можете попробовать Flux#generate и Queue#peek. Просто имейте в виду, что это peek
вернет null
, если очередь пуста, и ее нельзя использовать в onNext
.
Что-то вроде:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
Существует также оператор Flux#Repeat When на случай, если вы хотите повторно подписаться на очередь после того, как она считалась пустой, например, с помощью:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
Комментарии:
1. Это сработало. Хотя мой клиент (браузер) продолжал отправлять запрос снова и снова, как только сервер сигнализировал о завершении приема. Итак, я изменил на BlockingQueue#take. Не уверен, как это вписывается в реактивную асинхронную обработку.
2. Будьте осторожны, потому что
#take
блокируется. Убедитесь, что вы подписаны на удобный для блокировки планировщик (напримерSchedulers.elastic()
)3. @bsideup, почему здесь
peek()
используется вместоpoll()
для удаления элемента из очереди после его конвейерной обработки?
Ответ №2:
Альтернативный вариант, который стоит рассмотреть, — избавиться от BlockingQueue и вместо этого использовать приемники.
Для этого требуется:
-
Создание приемника, например
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
-
Предоставление приемника в виде потока:
sink.asFlux()
-
Нажатие на приемник:
sink.tryEmitNext("SOME MESSAGE");