#an&ular #rxjs
#an&ular #rxjs
Вопрос:
У меня есть следующий код:
return forkJoin(
pa&es.map(
i =&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`)
)
).subscribe((event: any) =&&t; {
event.forEach((entry) =&&t; {
devices = devices.concat(entry.content);
});
Проблема с этим кодом заключается в том, что он отправляет серию вызовов API на мой сервер в случае, если количество страниц, которые он должен извлечь, велико.
Я попытался добавить небольшую задержку к каждому вызову API с помощью следующего кода:
i =&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`).pipe(delay(1000))
но не повезло — он по-прежнему отправляет все вызовы API в пакетном режиме, я предполагаю, что это поведение forkJoin().
Есть идеи, есть ли какой-нибудь простой подход для отправки запроса один за другим, связывая результат всех из них с одним массивом?
Ответ №1:
Я бы рассмотрел возможность использования mer&eMap
вместо forkJoin
для контроля уровня параллелизма, который готов поддерживать ваш сервер.
Код будет выглядеть следующим образом
concLevel = 10 // set the concurrency level
return from(pa&es) // &enerate a stream of pa&es out of the array
.pipe(
// transform the source into a stream of responses to http requests with a controlled level of concurrency
mer&eMap(pa&e =&&t; this.http.&et(`devices?pa&e=${pa&e}amp;size=8000`), concLevel),
// accumulate all events into an array which is returned when the source Observable completes
toArray()
)
.subscribe((event: any) =&&t; {
event.forEach((entry) =&&t; {
devices = devices.concat(entry.content);
});
У меня нет игровой площадки для тестирования этого, поэтому могут быть некоторые ошибки, но я надеюсь, что идея ясна
Ответ №2:
contactMap : сопоставьте значения с внутренними наблюдаемыми, подписывайтесь и передавайте по порядку.
from(pa&es).pipe(
concatMap( i=&&t; this.http.&et(`devices?pa&e=${i}amp;size=8000`))
).subscribe ( event =&&t; {
event.forEach((entry) =&&t; {
devices = devices.concat(entry.content);
});
});
Комментарии:
1. Нет необходимости добавлять
delay
в случаеconcatMap
, поскольку это гарантирует, что следующий запрос будет обработан после выполнения предыдущего.
Ответ №3:
Вместо этого используйте zip
operator, поскольку он выполняет наблюдаемые объекты с переменными интервалами.
Смотрите документацию для справки: https://www.learnrxjs.io/learn-rxjs/operators/combination/zip
// RxJS v6
import { delay } from 'rxjs/operators';
import { of, zip } from 'rxjs';
const sourceOne = of('Hello');
const sourceTwo = of('World!');
const sourceThree = of('Goodbye');
const sourceFour = of('World!');
//wait until all observables have emitted a value then emit all as an array
const example = zip(
sourceOne,
sourceTwo.pipe(delay(1000)),
sourceThree.pipe(delay(2000)),
sourceFour.pipe(delay(3000))
);
//output: ["Hello", "World!", "Goodbye", "World!"]
const subscribe = example.subscribe(val =&&t; console.lo&(val));