Отмена подписки с помощью RXJS

#javascript #rxjs

#javascript #rxjs

Вопрос:

Я трачу слишком много времени, пытаясь выяснить, какое лучшее решение для исправления этой вложенной подписки. Я безуспешно пробовал mergeMap, flatMap и switchMap. К сожалению, найденные примеры не совсем то, что мне было нужно, поэтому в итоге я получаю только один результат, или неопределенный, или ошибку. Код, который необходимо исправить, является:

 this.myService.getAll().subscribe(res => {

// res.result is an array of 20 objects
res.result.forEach(m => {
    // for each of them I call 2 different endpoints adding a key with the response
    this.myService.checkFirst(m.id).subscribe(result => {
        m.first = resu<
    });
    this.myService.checkSecond(m.id).subscribe(result => {
        m.second = resu<
    });
});
// once all subscriptions are fulfilled I would like to return the mapped array
this.dataLength = res.total;
});
  

Ответ №1:

Попробуйте

 this.myService.getAll().pipe(
  switchMap(res => {
    const obs$ = res.result.map(m => {
      return this.myService.checkFirst(m.id).pipe(
        map(first => ({...m, first})),
      );
    });
  
    return forkJoin(obs$).pipe(
      map(result => ({...res, result})),
    ),
  }),
  switchMap(res => {
    const obs$ = res.result.map(m => {
      return this.myService.checkSecond(m.id).pipe(
        map(second => ({...m, second})),
      );
    });
  
    return forkJoin(obs$).pipe(
      map(result => ({...res, result})),
    ),
  }),
).subscribe(/* ... */);
  

Комментарии:

1. Я бы ожидал, что это будет в два раза медленнее, чем просто выполнение всех вызовов одновременно. Это потому, что вам нужно дождаться checkFirst завершения перед запуском checkSecond .

Ответ №2:

Если я правильно понимаю вашу проблему, я бы поступил следующим образом.

Я предполагаю this.myService.getAll() , что это какой-то http-вызов, поэтому вы хотите что-то сделать, как только этот вызов вернется и соответствующий наблюдаемый завершится. Для этого используется оператор concatMap , который позволяет вам работать с последующими наблюдаемыми, как только исходный, this.myService.getAll() в данном случае, завершается.

Теперь, как только вы получили результат this.myService.getAll() , вам нужно выполнить 2 вызова для каждого элемента в возвращаемом массиве. Такие вызовы могут выполняться параллельно, и каждый из них имеет побочный эффект обновления некоторых свойств элемента.

Для параллельного выполнения 2 вызовов вы можете использовать forkJoin функцию, которая возвращает наблюдаемую, которая выдается, как только оба вызова завершены, и выдает массив с результатами каждого вызова. Другими словами, этот сокращенный код должен выполнять работу для одного элемента

 forkJoin([this.myService.checkFirst(m.id), this.myService.checkSecond(m.id)]).pipe(
   tap(([first, second]) => {
     m.first = first;
     m.second = second;
   })
)
  

Поскольку в вашем массиве 20 элементов, вам нужно выполнить 20 раз вышеуказанную логику, возможно, параллельно. Если это так, вы можете использовать снова forkJoin для выполнения вышеуказанных запросов для каждого элемента в массиве.

Итак, объединив все это вместе, ваше решение может выглядеть примерно так

 this.myService.getAll().pipe(
  concatMap(res => {
    // reqestForItems is an array of Observables, each Observable created by calling the forkJoin that allows us to run the 2 calls in parallel
    const reqestForItems = res.result.map(m => 
      forkJoin([this.myService.checkFirst(m.id), this.myService.checkSecond(m.id)]).pipe(
        tap(([first, second]) => {
          m.first = first;
          m.second = second;
        })
      )
    )
    // return the result of the execution of requests for the items
    return forkJoin(reqestForItems).pipe(
      // since what is requested as result is the array with each item enriched with the data retrieved, you return the res object which has been modified by the above logic
      map(() => res)
    )
  })
)
.subscribe(res => // the res.result is an array of item where each item has been enriched with data coming from the service)
  

Если вам приходится иметь дело с наблюдаемыми и вариантами использования http, вам может показаться интересной эта статья о наблюдаемых и http шаблонах.

Комментарии:

1. Хех. Я только что написал, по сути, то же самое решение. Только я не думаю tap , что обогащать объект, а затем map(() => obj) помещать обогащенный объект в поток, очень чисто (хотя это работает!). Вместо tap этого я бы использовал map и return m; в последней строке. Тогда второму forkJoin вообще не нужен канал.

Ответ №3:

Одна из приятных особенностей RxJS заключается в том, что вы можете вкладывать потоки так глубоко, как вам нравится. Итак, если вы можете создать поток, который обогатит один объект, то вы можете вложить 20 из них, которые обогатят весь массив.

Итак, для одного обогащенного объекта поток, который выводит обогащенный объект на консоль, может выглядеть следующим образом:

 const oneObject = getObject();
forkJoin({
  firstResult: this.myService.checkFirst(oneObject.id),
  secondResult: this.myService.checkSecond(oneObject.id)
}).pipe(
  map(({firstResult, secondResult}) => {
    oneObject.first = firstResu<
    oneObject.second = secondResu<
    return oneObject;
  })
).subscribe(
  console.log
);
  

Как выглядит то же самое, если oneObject оно само возвращается из наблюдаемого? Это то же самое, только теперь мы объединяем или переключаем наш объект в тот же поток, который мы создали выше.

 this.myService.getOneObject().pipe(
  mergeMap(oneObject => 
    forkJoin({
      firstResult: this.myService.checkFirst(oneObject.id),
      secondResult: this.myService.checkSecond(oneObject.id)
    }).pipe(
      map(({firstResult, secondResult}) => {
        oneObject.first = firstResu<
        oneObject.second = secondResu<
        return oneObject;
      })
    )
  )
).subscribe(
  console.log
);
  

Теперь остался один шаг. Чтобы сделать все это для целого массива объектов. Для достижения этой цели нам нужен способ запуска массива наблюдаемых. К счастью, у нас есть forkJoin — тот же оператор, который мы используем для запуска checkFirst и checkSecond одновременно. Он также может объединить все это вместе. Это может выглядеть так:

 this.myService.getAll().pipe(
  map(allRes =>
    allRes.result.map(m => 
      forkJoin({
        first: this.myService.checkFirst(m.id),
        second: this.myService.checkSecond(m.id)
      }).pipe(
        map(({first, second}) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    )
  ),
  // forkJoin our array of streams, so that your 40 service calls (20 for 
  // checkFirst and 20 for checkSecond) are all combined into a single stream.
  mergeMap(mArr => forkJoin(mArr)),
).subscribe(resultArr => {
  // resultArr is an aray of length 20, with objects enriched with a .first
  // and a .second
  // Lets log the result for he first object our array.
  console.log(resultArr[0].first, resultArr[0].second)
});
  

Вот то же решение, в котором я свернул наши map и mergeMap в один mergeMap :

 this.myService.getAll().pipe(
  mergeMap(allRes =>
    forkJoin(allRes.result.map(m => 
      forkJoin({
        first: this.myService.checkFirst(m.id),
        second: this.myService.checkSecond(m.id)
      }).pipe(
        map(({first, second}) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    ))
  )
).subscribe(console.log);
  

Если вы не уверены, что checkFirst и checkSecond завершите, вы можете zip вместо forkJoin , затем отказаться от подписки с take(1) помощью или first()

 this.myService.getAll().pipe(
  mergeMap(allRes =>
    forkJoin(allRes.result.map(m => 
      zip(
        this.myService.checkFirst(m.id),
        this.myService.checkSecond(m.id)
      ).pipe(
        first(),
        map(([first, second]) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    ))
  )
).subscribe(console.log);