Предварительно создайте внутренний Mono как переменную

#java #project-reactor

#java #проект-реактор

Вопрос:

Предположим, что простая реактивная строка потока это:

         Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -> 
                        Mono.just(i)
                                .map( ... map some value)
                                .flatMap( ... async call)
                                .map(... more mappings)
                )
                .doOnNext(log::info)
                .blockLast()
        ;
  

Предположим, что Mono внутри родительского потока .flatMap() представляет собой сложный реактивный поток со сложной логикой. Есть ли способ сохранить этот Mono как переменную, которая может быть передана родительскому потоку? Другими словами, что-то вроде этого:

 final Function<Integer, Publisher<String>> monoPublisher = Mono.something()
                .map( ... map some value)
                .flatMap( ... async call)
                .map(... more mappings);

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(monoPublisher)
                .doOnNext(log::info)
                .blockLast()
        ;
  

Наивный ответ был бы:

 final Function<Integer, Publisher<String>> monoPublisher = i -> Mono.just(i) ...
  

Однако это все равно отложило бы инициализацию Mono до фактической подписки, и для каждого элемента в родительском потоке был бы создан новый Mono.

Я знаком с Mono.create() , но я не вижу, как заставить его работать здесь, не создавая собственную Publisher реализацию, которая обтекает приемник, возвращаемый Mono.create()

РЕДАКТИРОВАТЬ: Спасибо ESala за предложение. Я просто изменил его пример, чтобы использовать ThreadLocal вместо Queue , что не требуется для этого примера с использованием Flux.just(), но предотвратило бы условия гонки при использовании многопоточного производителя

 final ThreadLocal<Integer> threadLocalStorage = new ThreadLocal<>();

        final Mono<String> mono = Mono.fromSupplier(threadLocalStorage::get)
                .map(Objects::toString);

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -> {
                    threadLocalStorage.set(i);
                    return mono;
                })
                .doOnNext(System.out::println)
                .blockLast()
        ;
  

Комментарии:

1. Посмотрите Tuples.of в библиотеке кода reactor.

2. Можете ли вы предоставить больше контекста о том, почему наивный ответ не подходит? В общем, не должно иметь значения, что Mono для каждого элемента во внешнем создается новый Flux .

3. это звучит слишком сложно. и если есть map , то, конечно, вам нужно входное значение, поэтому ваш второй фрагмент… просто работает.

4. @ESala — в общем, это не имеет значения, но что, если ваш внутренний Mono имеет 100 операторов и создается программно. Очень неэффективно создавать его для каждого элемента во внешнем потоке. Гораздо эффективнее иметь Mono, который уже создан, поэтому единственным созданным объектом является новая подписка. Когда каждый сервер обрабатывает> 15 Тыс. запросов в секунду, нам нужно использовать любую оптимизацию производительности, и сбор мусора тоже является проблемой.

Ответ №1:

Может быть, что-то подобное сработает, просто замените на LinkedList что-то более причудливое, поддерживающее параллельный доступ:

 Queue<Integer> arguments = new LinkedList<>();

Publisher<String> expensivePublisher =
        Mono.fromSupplier(arguments::remove)
                .map( ... map some value)
                .flatMap( ... async call)
                .map(... more mappings);

Flux.just(1, 2, 3)
        .flatMap(
                i -> {
                    arguments.add(i);
                    return expensivePublisher;
                })
        .blockLast();
  

Но это все равно похоже на взлом. Если во внутреннем действительно есть 100 операторов, тогда я бы предложил попытаться оптимизировать с этой стороны. Улучшения в этой области, вероятно, окажут большее влияние, чем повторное Publisher использование, чтобы избежать его воссоздания.

Комментарии:

1. спасибо за это. Не знал, что Mono.fromSupplier звонит поставщику, чтобы получить значение для каждой подписки, я думал, что он вызывает его один раз при сборке. Это сработает. Я согласен, что это все еще немного взлом, но это намного меньше кода, чем оболочка, которую я написал.