Как я могу создать поток реактора из очереди блокировки?

#java #spring-webflux #project-reactor

#java #spring-webflux #проект-реактор

Вопрос:

Я пытаюсь реализовать поток реактора, созданный из BlockingQueue, но не уверен, какой оператор лучше всего подходит для моего варианта использования?

Я создаю потоковую конечную точку REST, где ответом является поток, который должен продолжать отправлять сообщения из BlockingQueue в качестве ответа на вызов GET REST.

Я уже пробовал форумы и документацию и могу найти только поток, инициированный из повторяющихся коллекций или источников реактивных данных, но никаких примеров из любого BlockingQueue.

Комментарии:

1. Добавьте еще какое-нибудь описание или фрагмент вашего кода, чтобы другие могли понять вашу проблему и помочь вам.

Ответ №1:

Вы можете попробовать Flux#generate и Queue#peek. Просто имейте в виду, что это peek вернет null , если очередь пуста, и ее нельзя использовать в onNext .

Что-то вроде:

 Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});
  

Существует также оператор Flux#Repeat When на случай, если вы хотите повторно подписаться на очередь после того, как она считалась пустой, например, с помощью:

 flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
  

Комментарии:

1. Это сработало. Хотя мой клиент (браузер) продолжал отправлять запрос снова и снова, как только сервер сигнализировал о завершении приема. Итак, я изменил на BlockingQueue#take. Не уверен, как это вписывается в реактивную асинхронную обработку.

2. Будьте осторожны, потому что #take блокируется. Убедитесь, что вы подписаны на удобный для блокировки планировщик (например Schedulers.elastic() )

3. @bsideup, почему здесь peek() используется вместо poll() для удаления элемента из очереди после его конвейерной обработки?

Ответ №2:

Альтернативный вариант, который стоит рассмотреть, — избавиться от BlockingQueue и вместо этого использовать приемники.

Для этого требуется:

  1. Создание приемника, например

     private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
      
  2. Предоставление приемника в виде потока:

     sink.asFlux()
      
  3. Нажатие на приемник:

     sink.tryEmitNext("SOME MESSAGE");