#angular #rxjs
Вопрос:
Я пытаюсь создать компонент, который в основном выполняет две вещи:
- Разделите файл на более мелкие большие объекты
- Загрузите части файла, как только все части будут загружены, затем выполните вызов API и отметьте элемент как завершенный.
До сих пор мне удавалось создавать сквозной poc, но я пытаюсь улучшить свой код, чтобы загрузить его только n chunks at a time
затем, чтобы перейти к следующему пакету и подождать, пока не будут загружены все фрагменты.
Для логики разделения, которую я использую, bufferCount
forkJoin
, но я хочу иметь возможность вызывать API после завершения всех блоков. Вместо этого он запускается после завершения каждой партии.
- Следующая партия не должна запускаться, если предыдущая партия завершится неудачно.
const apiCallouts = parts.map((part, idx) => {
const formData = new FormData();
formData.append('part', part);
const opts = { ...baseOptions(this.conf) };
delete opts.headers['content-type'];
return this.http.put(// Perform API Callout)
});
const mergeObs = from(apiCallouts).pipe(
bufferCount(5),
concatMap(buffer => forkJoin(buffer))
);
Как бы лучше всего это сделать?
Ответ №1:
Вместо этого он запускается после завершения каждой партии.
Я думаю, что для этого вы могли бы использовать toArray()
оператора:
const mergeObs = from(apiCallouts).pipe(
bufferCount(5),
concatMap(buffer => forkJoin(buffer)),
// `forkJoin` in this case will return an array.
// So, we can use `mergeAll()` to explode the array.
mergeAll(),
// Accumulate everything and emit on `complete`.
toArray(),
);
Следующая партия не должна запускаться, если предыдущая партия завершится неудачно.
Я бы сказал, что здесь все зависит от того, что вы хотите, чтобы произошло в случае сбоя пакета.
Например, если вы хотите завершить весь поток, вы можете использовать что-то следующее:
/* ... */
concatMap(buffer => forkJoin(buffer)),
mergeAll(),
toArray(),
// `NEVER` will immediately emit a `complete` notification.
// If you want your `next` callback to be called, you can replace `NEVER`
// with something like `of('a message')`.
// Note: `err` would come from `concatMap`'s inner observable.
catchError(err => NEVER)
Или вы можете повторить попытку только для того конкретного пакета, который не удался:
/* */
bufferCount(5),
concatMap(
buffer => forkJoin(buffer).pipe(
// Or, you could use `retryWhen` for more flexibility.
retry(3),
)
),
/* ... */