Счет буфера карта объединения: фрагментация файлов

#angular #rxjs

Вопрос:

Я пытаюсь создать компонент, который в основном выполняет две вещи:

  1. Разделите файл на более мелкие большие объекты
  2. Загрузите части файла, как только все части будут загружены, затем выполните вызов API и отметьте элемент как завершенный.

До сих пор мне удавалось создавать сквозной poc, но я пытаюсь улучшить свой код, чтобы загрузить его только n chunks at a time затем, чтобы перейти к следующему пакету и подождать, пока не будут загружены все фрагменты.

Для логики разделения, которую я использую, bufferCount forkJoin , но я хочу иметь возможность вызывать API после завершения всех блоков. Вместо этого он запускается после завершения каждой партии.

  • Следующая партия не должна запускаться, если предыдущая партия завершится неудачно.
 const apiCallouts = parts.map((part, idx) => {
      const formData = new FormData();
      formData.append('part', part);
      const opts = { ...baseOptions(this.conf) };
      delete opts.headers['content-type'];
      return this.http.put(// Perform API Callout)
    });

const mergeObs = from(apiCallouts).pipe(
      bufferCount(5),
      concatMap(buffer => forkJoin(buffer))
    );
 

Как бы лучше всего это сделать?

Ответ №1:

Вместо этого он запускается после завершения каждой партии.

Я думаю, что для этого вы могли бы использовать toArray() оператора:

 const mergeObs = from(apiCallouts).pipe(
  bufferCount(5),
  concatMap(buffer => forkJoin(buffer)),

  // `forkJoin` in this case will return an array.
  // So, we can use `mergeAll()` to explode the array.
  mergeAll(),

  // Accumulate everything and emit on `complete`.
  toArray(),
);
 

Следующая партия не должна запускаться, если предыдущая партия завершится неудачно.

Я бы сказал, что здесь все зависит от того, что вы хотите, чтобы произошло в случае сбоя пакета.

Например, если вы хотите завершить весь поток, вы можете использовать что-то следующее:

 /* ... */
concatMap(buffer => forkJoin(buffer)),
mergeAll(),
toArray(),

// `NEVER` will immediately emit a `complete` notification.
// If you want your `next` callback to be called, you can replace `NEVER`
// with something like `of('a message')`.
// Note: `err` would come from `concatMap`'s inner observable.
catchError(err => NEVER)
 

Или вы можете повторить попытку только для того конкретного пакета, который не удался:

 /*  */

bufferCount(5),
concatMap(
  buffer => forkJoin(buffer).pipe(
    // Or, you could use `retryWhen` for more flexibility.
    retry(3),
  )
),

/* ... */