#spring-boot #spring-webflux #project-reactor
#весенняя загрузка #spring-webflux #проект-реактор
Вопрос:
TopicProcessor был удален с версии 3.4 reactor, и теперь я пытаюсь найти самый простой способ заменить его новым API. Я бы хотел, чтобы реализация была максимально надежной, даже если производительность ухудшится. Я не хочу бороться с событиями потерь или переполненным потоком, просто сделайте это глупо простым. Мне также нужно буферизировать данные, поэтому я использовал bufferTimeout
для этого, и это отлично сработало TopicProcessor
. Как добиться этого с помощью нового API?
Я пытался использовать Sinks
, но с большим давлением возникло исключение переполнения, и весь поток был завершен.
Ниже приведена моя базовая реализация перед обновлением до Spring Boot 2.4.1.
package com.example.reactorplayground.config;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class TopicProcessor<T> implements Processor<T> {
private final reactor.core.publisher.TopicProcessor<T> processor;
public TopicProcessor() {
this.processor = reactor.core.publisher.TopicProcessor.share("in-memory-topic-thread", 1);
}
@Override
public Mono<T> add(T event) {
return Mono.fromCallable(() -> {
processor.onNext(event);
return event;
});
}
@Override
public Flux<T> consumeWith() {
return processor;
}
}