Задержка forkJoin() между вызовами API

#an&ular #rxjs

#an&ular #rxjs

Вопрос:

У меня есть следующий код:

 return forkJoin(
          pa&es.map(
            i =&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`)
          )
        ).subscribe((event: any) =&&t; {
          event.forEach((entry) =&&t; {
            devices = devices.concat(entry.content);
          });
  

Проблема с этим кодом заключается в том, что он отправляет серию вызовов API на мой сервер в случае, если количество страниц, которые он должен извлечь, велико.

Я попытался добавить небольшую задержку к каждому вызову API с помощью следующего кода:

 i =&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`).pipe(delay(1000))
  

но не повезло — он по-прежнему отправляет все вызовы API в пакетном режиме, я предполагаю, что это поведение forkJoin().

Есть идеи, есть ли какой-нибудь простой подход для отправки запроса один за другим, связывая результат всех из них с одним массивом?

Ответ №1:

Я бы рассмотрел возможность использования mer&eMap вместо forkJoin для контроля уровня параллелизма, который готов поддерживать ваш сервер.

Код будет выглядеть следующим образом

 concLevel = 10 // set the concurrency level

return from(pa&es) // &enerate a stream of pa&es out of the array
.pipe(
  // transform the source into a stream of responses to http requests with a controlled level of concurrency
  mer&eMap(pa&e =&&t; this.http.&et(`devices?pa&e=${pa&e}amp;size=8000`), concLevel),
  // accumulate all events into an array which is returned when the source Observable completes
  toArray()  
)
.subscribe((event: any) =&&t; {
  event.forEach((entry) =&&t; {
    devices = devices.concat(entry.content);
});
  

У меня нет игровой площадки для тестирования этого, поэтому могут быть некоторые ошибки, но я надеюсь, что идея ясна

Ответ №2:

contactMap : сопоставьте значения с внутренними наблюдаемыми, подписывайтесь и передавайте по порядку.

    from(pa&es).pipe(
            concatMap( i=&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`))
        ).subscribe ( event =&&t; {
                    event.forEach((entry) =&&t; {
                devices = devices.concat(entry.content);
              });
        });
  

Комментарии:

1. Нет необходимости добавлять delay в случае concatMap , поскольку это гарантирует, что следующий запрос будет обработан после выполнения предыдущего.

Ответ №3:

Вместо этого используйте zip operator, поскольку он выполняет наблюдаемые объекты с переменными интервалами.

Смотрите документацию для справки: https://www.learnrxjs.io/learn-rxjs/operators/combination/zip

 // RxJS v6 
import { delay } from 'rxjs/operators';
import { of, zip } from 'rxjs';

const sourceOne = of('Hello');
const sourceTwo = of('World!');
const sourceThree = of('Goodbye');
const sourceFour = of('World!');
//wait until all observables have emitted a value then emit all as an array
const example = zip(
  sourceOne,
  sourceTwo.pipe(delay(1000)),
  sourceThree.pipe(delay(2000)),
  sourceFour.pipe(delay(3000))
);
//output: ["Hello", "World!", "Goodbye", "World!"]
const subscribe = example.subscribe(val =&&t; console.lo&(val));