Миграция из TopicProcessor

#spring-boot #spring-webflux #project-reactor

#весенняя загрузка #spring-webflux #проект-реактор

Вопрос:

TopicProcessor был удален с версии 3.4 reactor, и теперь я пытаюсь найти самый простой способ заменить его новым API. Я бы хотел, чтобы реализация была максимально надежной, даже если производительность ухудшится. Я не хочу бороться с событиями потерь или переполненным потоком, просто сделайте это глупо простым. Мне также нужно буферизировать данные, поэтому я использовал bufferTimeout для этого, и это отлично сработало TopicProcessor . Как добиться этого с помощью нового API?

Я пытался использовать Sinks , но с большим давлением возникло исключение переполнения, и весь поток был завершен.

Ниже приведена моя базовая реализация перед обновлением до Spring Boot 2.4.1.

 package com.example.reactorplayground.config;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TopicProcessor<T> implements Processor<T> {
    private final reactor.core.publisher.TopicProcessor<T> processor;

    public TopicProcessor() {
        this.processor = reactor.core.publisher.TopicProcessor.share("in-memory-topic-thread", 1);
    }

    @Override
    public Mono<T> add(T event) {
        return Mono.fromCallable(() -> {
            processor.onNext(event);
            return event;
        });
    }

    @Override
    public Flux<T> consumeWith() {
        return processor;
    }
}