Обработка последовательных http-вызовов с помощью RxJS и получение ответа, как только он поступает

#node.js #ajax #rxjs

#node.js #ajax #rxjs

Вопрос:

Я пытаюсь написать службу JS, которая потенциально будет отправлять несколько вызовов во внешний API одновременно, и, поскольку скорость API ограничена, я хочу распределять вызовы и обрабатывать ответы отдельно.

Я мог бы использовать concatMap или mergeMap (с опцией параллелизма), но я понятия не имею, как обрабатывать ответы, как я упоминал.

Если service A , service B и service C указать HTTP-службе отправить 3 отдельных запроса, мне нужна функция в service A , чтобы получить первый ответ в своем собственном канале.

http.js:

 let subscribe;
const observable$ = Observable.create((sub) => {
    subscribe = sub;
});

function generateRequest(ajaxParams){
    subscribe.next(ajaxParams);
}

observable$
    .pipe(
        mergeMap((params) => {
           return ajax(params);
        }, response => response, 2)
    )
    .subscribe((response) => {
        console.log(response);
    });

  

observable$ будучи потоком, куда я отправляю http-параметры, канал обрабатывает создание экземпляра ajax и задержки / параллелизм.

Итак, мой вопрос в том, как бы мне сделать что-то подобное:

serviceA.js:

 generateRequest(ajaxParams).pipe(map(response => response.data), tap((data) => {
    console.log('Here is the data', data);
})).subscribe(() => {})
  

Комментарии:

1. Вы отправляете запрос с узла или из браузера?

2. @AdrianBrand от node

Ответ №1:

Вы можете добиться этого, объединив все ваши наблюдаемые без предварительной подписки на них, как показано ниже:

 const service_A_Observable = generateRequest(ajaxParams)
  .pipe(
    map(response => response.data),
    tap((data) => {
     // handle stream here...
    })
  );

const service_B_Observable = generateRequest(ajaxParams)
  .pipe(
    map(response => response.data),
    tap((data) => {
        // handle stream here...
    })
  );

from([service_A_Observable, service_B_Observable]).pipe(concatMap(obs => obs)).subscribe();
  

Или, при необходимости, настройка желаемого параллелизма:

 from([...]).pipe(mergeMap(o => o, MAX_PARALLEL)).subscribe();