Попытка управлять несколькими потоками / Моно, запускать некоторые из них раньше других, комбинировать некоторые из них и немного теряться

#project-reactor #reactor #spring-data-mongodb-reactive

#project-reactor #реактор #spring-data-mongodb-реактивный

Вопрос:

У меня есть модуль, который принимает идентификаторы объектов и «тип разрешения» в качестве параметров, а затем собирает данные (в первую очередь) асинхронно с помощью нескольких операций, которые возвращают потоки. Разрешение разбивается на несколько (опять же, в первую очередь) асинхронных операций, каждая из которых работает над сбором различных типов данных, которые вносят вклад в разрешение. Я говорю «в первую очередь» асинхронно, потому что для некоторых типов разрешений требуются некоторые предварительные операции, которые должны выполняться синхронно, чтобы предоставить информацию для оставшихся операций асинхронного потока разрешения. Теперь, пока выполняется эта синхронная операция, может начаться, по крайней мере, часть общей операции асинхронного разрешения. Я хотел бы запустить эти операции потока во время выполнения синхронных операций. Затем, как только синхронные данные будут разрешены, я могу получить каждый поток для выполнения оставшихся операций. Некоторые типы разрешений будут иметь все операции потока, возвращающие данные, в то время как другие собирают меньше информации, а некоторые операции потока останутся пустыми. Операции разрешения требуют много времени, и я хотел бы иметь возможность запускать некоторые операции потока раньше, чтобы я мог немного сжать время — это очень важно для того, что я выполняю. Подписка So eager идеальна, если я могу гарантировать, что не пропущу ни одного выпуска товара.

Имея это в виду, как я могу:

  1. Создайте «держатель» или «контейнер» для каждой из операций потока, которые понадобятся для разрешения всего, и инициализируйте их как пустые (например Flux.empty() )
  2. Добавьте элементы ко всему, что я могу создать в пункте 1 выше — он был инициализирован как пустой, но мне могут понадобиться данные из одной или нескольких операций с конечным и асинхронным потоком, но я не хочу хранить их отдельно, и они могут отображаться как один поток, когда я буду использовать collectList() их для создания Mono .
  3. Когда некоторые из этих Flux операций должны запускаться раньше других, как я могу их запустить и убедиться, что я не пропустил никаких данных? И если я, например, запустил поток разрешения имен, могу ли я добавить к нему, как в пункте 2 выше? Допустим, я хочу начать извлечение некоторых данных, затем выполнить синхронную операцию, а затем создать другой поток разрешения имен из результата синхронной операции, могу ли я добавить этот новый поток к исходному потоку разрешения имен, поскольку он будет возвращать тот же тип данных? Я в курсе Flux.merge() , но было бы удобно работать с одной ссылкой на поток, к которой я могу продолжать добавлять, если это возможно.

Понадобится ли мне объект коллекции, например список, а затем использовать операцию слияния? Изначально я думал об использовании a ConnectableFlux , пока не понял, что он предназначен для подключения нескольких подписчиков, а не для подключения нескольких издателей. Я думаю, что подключение нескольких издателей было бы хорошим ответом на мои потребности, если только это не обычная схема, с которой можно справиться лучше.

Я занимаюсь реактивным программированием только короткое время, поэтому, пожалуйста, будьте терпеливы к тому, как я пытаюсь описать, что я хочу сделать. Если я смогу лучше прояснить свои намерения, пожалуйста, дайте мне знать, где я был неясен, и я с радостью попытаюсь прояснить это. Заранее спасибо за ваше время и помощь!

РЕДАКТИРОВАТЬ: вот окончательная версия Kotlin, приятная и краткая:

 private val log = KotlinLogging.logger {}

class ReactiveDataService {
    private val createMono: () -> Mono<List<Int>> = {
        Flux.just(9, 8, 7)
            .flatMap {
                Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
                    .parallel()
                    .runOn(Schedulers.boundedElastic())
            }
            .collectList()
            .cache()
    }

    private val processResults: (List<String>, List<String>) -> String =
        { d1, d2 -> "ntdownstream 1: $d1ntdownstream 2: $d2" }

    private val convert: (List<Int>, Int) -> Flux<String> =
        { data, multiplier -> Flux.fromIterable(data.map { String.format("=", it * multiplier) }) }

    fun doQuery(): String? {
        val mono = createMono()
        val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
        val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
        return Mono.zip(downstream1, downstream2, processResults).block()
    }
}

fun main() {
    val service = ReactiveDataService()
    val start = System.currentTimeMillis()
    val result = service.doQuery()
    log.info("{}ntTotal time: {}ms", result, System.currentTimeMillis() - start)
}
 

И результат:

 downstream 1: [ 66,  39,  40,  88,  97,  35,  70,  91,  27,  12,  84,  37,  35,  15,  45,  27,  85,  22,  55,  89,  81,  21,  43,  62]
downstream 2: [132,  78,  80, 176, 194,  70, 140, 182,  54,  24, 168,  74,  70,  30,  90,  54, 170,  44, 110, 178, 162,  42,  86, 124]
Total time: 209ms
 

Ответ №1:

Это звучит как идеальная работа для reactor. Синхронные вызовы могут быть обернуты для возврата в виде потоков (или моноблоков) с использованием эластичного планировщика, позволяющего выполнять их параллельно. Затем, используя различные операторы, вы можете скомпоновать их все вместе, чтобы создать единый поток, который представляет результат. Подпишитесь на этот поток, и вся машина заработает.

Я думаю, вам нужно использовать Mono.flatMapMany вместо Flux.usingWhen.

 public class ReactiveDataService {
  public static void main(final String[] args) {
    ReactiveDataService service = new ReactiveDataService();
    service.doQuery();
  }

  private Flux<Integer> process1(final List<Integer> data) {
    return Flux.fromIterable(data);
  }

  private Flux<Integer> process2(final List<Integer> data) {
    return Flux.fromIterable(data).map(i -> i * 2);
  }

  private String process3(List<Integer> downstream1, List<Integer> downstream2) {
    System.out.println("downstream 1: "   downstream1);
    System.out.println("downstream 2: "   downstream2);
    return "Done";
  }

  private void doQuery() {
    final Mono<List<Integer>> mono =
        Flux.just(9, 8, 7)
            .flatMap(
                limit ->
                    Flux.fromStream(
                            Stream.generate(() -> new Random().nextInt(100))
                                .peek(
                                    i -> {
                                      try {
                                        Thread.sleep(500);
                                      } catch (InterruptedException ignored) {
                                      }
                                    })
                                .limit(limit))
                        .parallel()
                        .runOn(Schedulers.boundedElastic()))
            .collectList()
            .cache();
    final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
    final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
    Mono.zip(downstream1, downstream2, this::process3).block();
  }
}

 

Комментарии:

1. Спасибо за ответ. Я пробовал это, и в основном это работает хорошо. Проблема, которую мне осталось выяснить, заключается в том, как выполнить несколько операций потока, которые зависят от завершения одной операции потока. Flux.usingWhen() не кажется идеальным, потому что он будет повторно вычислять поток для каждого из зависимых потоков.

2. Вы можете использовать оператор cache() для совместного использования результата исходного потока с несколькими зависимыми потоками

3. @SteveStorck вы должны отредактировать вопрос вместо того, чтобы давать ответ. Обратите внимание, что я вынул .cache(1000), который ничего не делает. Кроме того, окончательный zip-файл упрощается за счет использования версии, оптимизированной для zip-файлов 2 mono с функцией объединения

4. как вы узнаете, когда вам нужно использовать usingWhen против flatMapMany? Достигают ли они разных целей или они оба лучше подходят для разных целей? Это одна из самых сложных частей для меня в отношении реактивного программирования. Так много вариантов, но трудно определить, какой из них лучше.

5. Я удалил ответ, который я добавил. Я добавил это как редактирование к исходному вопросу. Я вижу, что использование flatMapMany намного чище! Ваша помощь гарантирует, что я все делаю правильно в своем рабочем модуле. Теперь вместо блокировки я могу разрешить коду reactor четко координировать все источники, которые используются для разрешения запрошенных данных, без ненужной траты времени, явно блокируя, когда мне нужно получить необходимые данные. Спасибо; у вас есть моя полная благодарность!