#java #spring #project-reactor
#java #весна #проект-реактор
Вопрос:
В приложении я использую длительный опрос для внешней конечной точки HTTP. Я делаю это с помощью Spring reactive WebClient
. Чтобы полностью завершить работу при остановке приложения (и избежать уродливых трассировок стека Netty), я использую takeUntil()
с экземпляром an EmitterProcessor
, который я вызываю onNext()
, когда Spring останавливает мой компонент (я реализую SmartLifecycle
).
Все это работает примерно так:
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
Пока весь механизм работает нормально. Однако EmitterProcessor
помечено как @Deprecated
, и javadoc говорит использовать Sink
вместо. Sink
не реализует Publisher
интерфейс и, следовательно, не может быть передан takeUntilOther()
.
Что я должен сделать, чтобы решить эту проблему, не застряв на Project Reactor <3.5 навсегда?
Ответ №1:
Sinks
предназначены в качестве API, ориентированного на разработчика, для программного запуска реактивных событий. Это было бы не очень полезно, если бы не было способа представить их как типичные Flux
или Mono
для остальной части приложения.
Sinks.Many
имеет в asFlux()
виду этот эффект. Аналогично, Sinks.One
и Sinks.Empty
есть asMono()
представление.
Это то, что вы можете использовать для перехода takeUntilOther
.