Наблюдаемый пожар RxJS завершен после ряда асинхронных действий

#javascript #asynchronous #rxjs #observable

#javascript #асинхронный #rxjs #наблюдаемый

Вопрос:

Я пытаюсь создать observable, который выдает значения из ряда асинхронных действий (http-запросы с сервера Jenkins), которые позволят подписчику узнать, как только все действия будут завершены. Я чувствую, что, должно быть, что-то недопонимаю, потому что это не дает того, чего я ожидаю.

 'use strict';

let Rx = require('rx');
let _ = require('lodash');
let values = [
    {'id': 1, 'status': true},
    {'id': 2, 'status': true},
    {'id': 3, 'status': true}
];

function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        _.map(values, function(value) {
            var millisecondsToWait = 1000;
            setTimeout(function() { // just using setTimeout here to construct the example
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
        console.log("valuesObservable Sending onCompleted");
        observer.onCompleted()
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

valuesObservable().subscribe(observer);
  

Запустив это, я получаю вывод:

 valuesObservable Sending onCompleted
DONE!
Sending value:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
  

Хотя я хотел бы видеть что-то более похожее:

 Sending value:  { id: 1, status: true }
Received Data:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Received Data:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
Received Data:  { id: 3, status: true }
valuesObservable Sending onCompleted
DONE!
  

На самом деле меня не волнует порядок элементов в списке, я бы просто хотел, чтобы наблюдатель их получил.

Я полагаю, что происходит то, что Javascript асинхронно запускает функцию тайм-аута и сразу переходит к observer.onCompleted() строке. Как только подписавшийся наблюдатель получает событие OnCompleted (это правильное слово?), Он решает, что это сделано, и избавляется от себя. Затем, когда асинхронные действия завершаются и наблюдаемый срабатывает onNext , наблюдатель больше не существует, чтобы предпринимать с ними какие-либо действия.

Если я прав в этом, я все еще не понимаю, как заставить его вести себя так, как мне хотелось бы. Я наткнулся на антипаттерн, не осознавая этого? Есть ли лучший способ подойти ко всему этому?


Редактировать:

Поскольку я использовал setTimeout для построения своего примера, я понял, что могу использовать его для частичного решения своей проблемы, предоставив наблюдаемому тайм-аут.

 function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        let observableTimeout = 10000;
        setTimeout(function() {
            console.log("valuesObservable Sending onCompleted");
            observer.onCompleted();
        }, observableTimeout);
        _.map(values, function(value) {
            let millisecondsToWait = 1000;
            setTimeout(function() {
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
    });
}
  

Это дает мне всю информацию из наблюдаемого в нужном мне порядке (данные, затем завершение), но в зависимости от выбора времени ожидания я либо могу пропустить некоторые данные, либо мне придется долго ждать события завершения. Это просто неотъемлемая проблема асинхронного программирования, с которой мне приходится жить?

Комментарии:

1. setTimeout является асинхронным, так почему вы ожидаете, что ваша подписка будет ждать его до завершения?

2. @Baumi Я думал, что объяснил, что я не ожидаю, что он будет ждать, но я задаю этот вопрос, чтобы получить помощь в некоторой конструкции, которая заставила бы его ждать. Разве это не ясно? Не могли бы вы предложить, как я мог бы перефразировать это, чтобы лучше понять?

3. извините, приятель, моя ошибка — почему-то я не дочитал сообщение до конца…

4. Не беспокойся, друг.

Ответ №1:

Да, есть лучший способ. Проблема сейчас в том, что вы полагаетесь на временные задержки для своей синхронизации, когда на самом деле вы можете использовать Observable операторы для этого вместо этого.

Первый шаг — отказаться от прямого использования setTimeout . Вместо этого используйте timer

 Rx.Observable.timer(waitTime);
  

Затем вы можете преобразовать массив значений в Observable таким образом, чтобы каждое значение выдавалось как событие, выполнив:

 Rx.Observable.from(values);
  

И, наконец, вы бы использовали flatMap для преобразования этих значений в Observables и сглаживания их в конечную последовательность. Результатом является Observable то, что оно генерируется каждый раз, когда один из источников timers генерирует, и завершается, когда завершаются все исходные наблюдаемые.

 Rx.Observable.from(values)
  .flatMap(
    // Map the value into a stream
    value => Rx.Observable.timer(waitTime),
    // This function maps the value returned from the timer Observable
    // back into the original value you wanted to emit
    value => value
  )
  

Таким образом, полная valuesObservable функция будет выглядеть следующим образом:

 function valuesObservable(values) {
  return Rx.Observable.from(values)
    .flatMap(
      value => Rx.Observable.timer(waitTime),
      value => value
    )
    .do(
      x => console.log(`Sending value: ${value}`),
      null,
      () => console.log('Sending values completed')
    );
}
  

Обратите внимание, что вышеприведенное также сработало бы, если бы вы не использовали демонстрационный поток, т. Е. Если бы у вас были действительно http-потоки, которые вы могли бы даже упростить, используя merge (или concat для сохранения порядка)

 Rx.Observable.from(streams)
    .flatMap(stream => stream);

// OR
Rx.Observable.from(streams).merge();

// Or simply
Rx.Observable.mergeAll(streams);
  

Комментарии:

1. Это здорово, именно то, что я ищу. Кстати, в вашем коде небольшая опечатка: ваш do блок принимает параметр x , но регистрируется value .

Ответ №2:

Лучший способ построить наблюдаемый объект — использовать существующий примитив, а затем комбинацию существующих операторов. Это позволяет избежать нескольких головных болей (отмена подписки, управление ошибками и т.д.). Then Rx.Observable.create , Безусловно, полезно, когда ничто другое не подходит для вашего варианта использования. Интересно generateWithAbsoluteTime , подойдет ли.

В любом случае, здесь проблема, с которой вы сталкиваетесь, заключается в том, что вы завершаете своего наблюдателя перед отправкой ему данных. Итак, в основном вам нужно придумать лучший сигнал завершения. Может быть :

  • завершите x секунд после последнего выданного значения, если новое значение не выдается
  • завершается, когда значение равно некоторому «конечному» значению

Ответ №3:

Благодаря @paulpdaniels, это окончательный код, который сделал то, что я хотел, включая вызовы Jenkins:

 'use strict';

let Rx = require('rx');
let jenkinsapi = require('jenkins'); // https://github.com/silas/node-jenkins/issues
let jenkinsOpts = {
    "baseUrl": "http://localhost:8080",
    "options": {"strictSSL": false},
    "job": "my-jenkins-job",
    "username": "jenkins",
    "apiToken": "f4abcdef012345678917a"
};
let jenkins = jenkinsapi(JSON.parse(JSON.stringify(jenkinsOpts)));

function jobInfoObservable(jenkins, jobName) {
    // returns an observable with a containing a single list of builds for a given job
    let selector = {tree: 'builds[number,url]'};

    return Rx.Observable.fromNodeCallback(function(callback) {
        jenkins.job.get(jobName, selector, callback);
    })();
}

function buildIDObservable(jenkins, jobName) {
    // returns an observable containing a stream of individual build IDs for a given job
    return jobInfoObservable(jenkins, jobName).flatMap(function(jobInfo) {
        return Rx.Observable.from(jobInfo.builds)
    });
}

function buildInfoObservable(jenkins, jobName) {
    // returns an observable containing a stream of http response for each build in the history for this job
    let buildIDStream = buildIDObservable(jenkins, jobName);
    let selector = {'tree': 'actions[parameters[name,value]],building,description,displayName,duration,estimatedDuration,executor,id,number,result,timestamp,url'};

    return buildIDStream.flatMap(function(buildID) {
        return Rx.Observable.fromNodeCallback(function(callback) {
            jenkins.build.get(jobName, buildID.number, selector, callback);
        })();
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

buildInfoObservable(jenkins, jenkinsOpts.job).subscribe(observer);
  

Полагаясь на встроенные операторы Rx, мне удалось вообще не возиться с логикой синхронизации. Это также намного чище, чем вложение нескольких Rx.Observable.create операторов.