Rxjava объединяет несколько obsevables без чередования их потока

#rx-java #fifo

#rx-java #fifo

Вопрос:

поток выборки данных (обновлен) — каждый поток использует свой собственный поток (Scheduler.io )

Я новичок в Rxjava, я не могу понять, как выразить красные и желтые стрелки способом Rxjava.

Трудности для меня заключаются в следующем:

  1. Соберите все наблюдаемые данные, но без чередования. количество

  2. наблюдаемое — это переменная, а не фиксированное число в красной стрелке.

Я читал книги и искал, но все больше и больше головной боли, я не знаю, в какую сторону мне идти. Для части с красной стрелкой я мог бы просто использовать очередь блокировки, как обычно, но, возможно, лучше, чтобы вся цепочка была написана способом Rxjava.

Комментарии:

1. Почему вы говорите, что нет concat? это именно то, что вам нужно

2. Это связано с тем, что concat находится в фиксированном порядке, всегда определяемом порядком параметров. Но в моем случае я должен забрать первое поступление, которое может быть не первым по параметрам.

3. итак, вы действительно хотите слияние, которое задерживает другие наблюдаемые события до завершения текущего? может быть, если вы опишете, что вы действительно пытаетесь сделать, вам станет легче помочь

4. @Daniele Segato Каждый поток данных на прикрепленном изображении выполняется параллельно, я бы назвал subscribOn (Scheduler.io ()) чтобы выделить для него поток.

5. Я все еще не думаю, что вы объясняете, что вам нужно.

Ответ №1:

Я не уверен, что пока могу понять вопрос.

Предполагая, что это вопрос:

  • вы начинаете с задач, создающих список данных List<DATA> , назовем это пакетом данных;
  • все эти задачи выполняются параллельно в пуле потоков;
  • вы получаете эти пакеты и обрабатываете их
  • вас не волнует порядок пакетов, но вы заботитесь о внутреннем порядке элементов в этих пакетах
  • вы пытаетесь перенести это в RxJava

Насколько я могу судить по вашему вопросу, вы уже начали, и ваша задача теперь Observable<Data> выполняется вместо List<DATA> .

Это первое, что я бы изменил: поскольку вы заботитесь о внутреннем порядке пакетов, я бы сохранил список и, таким образом, вернул вашу задачу Observable<List<DATA>> .

Таким образом, вы получите кучу Observable<List<DATA>> , по одному для каждой параллельной задачи:

 List<Observable<List<DATA>>> tasks = ...;
List<Observable<List<DATA>>> parallelTask = new Arraylist(tasks.size());
for (Observable<List<DATA>> task : tasks) {
    // execute all the task in a separate thread
    // change Schedulers to your own pool schedulers if you want
    parallelTask.add(task.scheduleOn(Schedulers.io()));
}
Observable.merge(parallelTasks)
    .concatMap(batch -> Observable.from(batch)) // read below
    .observeOn(Schedulers.computation()) // or whichever thread you want to process the data
    .subscribe(data -> {
        // process data as it comes
    }, throwable -> {
        // handle error
    });
  

Это то, что он делает

  1. сначала объедините все наблюдаемые пакеты в один наблюдаемый
  2. затем укажите поток, в котором должна произойти подписка
  3. затем concatMap преобразуйте batch ( List<DATA> ) в поток событий типа DATA с сохранением их порядка (вы не будете получать данные из разных пакетов, пока этот не закончится)
  4. затем укажите поток, в котором вы хотите просматривать свои данные и обрабатывать их
  5. наконец, он подписывается на данные для их обработки

Комментарии:

1. List<Observable<List<DATA>>> parallelTasks = ...; Observable.merge(parallelTasks) .subscribeOn(Schedulers.io()) // use your own schedulers with a Thread Pool if you prefer .concatMap(batch -> Observable.from(batch)) // read below .observeOn(Schedulers.computation()) // or whichever thread you want to process the data .subscribe(data -> { // process data as it comes }, throwable -> { // handle error });

2. Извините, я нажимаю enter для ввода приведенных выше комментариев по ошибке. 1 В вашем решении все параллельные задачи выполняются в разных потоках планировщика. Пул ввода-вывода? Если нет, то как я могу это сделать? 2 Если я добавлю тайм-аут (10 секунд) перед subscirbe(), я получу TimerOutExecption , могу ли я остановить эти параллельные задачи, просто вызвав unsubscribe() ?

3. 1. это зависит от того, как вы создаете эти наблюдаемые. 2. Я изменил ответ, чтобы убедиться, что все они выполняются параллельно 3. когда больше нет подписчиков, наблюдаемый обычно отписывается, если он не является подключаемым наблюдаемым. Сбой должен вызвать отмену подписки на все базовые Obsevable. Но отмена их подписки не означает, что они перестают выполняться, это зависит от того, как они создаются.