Последовательный вызов rxjs с параллельными внутренними вызовами

#typescript #rxjs

#typescript #rxjs

Вопрос:

Используя rxjs, я хотел бы выполнить группу http-вызовов со значением, возвращенным из предыдущего вызова. Я бы хотел, чтобы внутренние вызовы выполнялись параллельно. Мне также нужно вернуть значение из первого вызова, как только оно станет доступным. Мне нужно иметь возможность подписаться на результат и обрабатывать любые ошибки. Ошибки, вызванные внутренними вызовами, не должны приводить к отмене родственных вызовов.

У меня есть следующий код, который работает, но ожидает возврата начального значения до завершения внутренних вызовов из-за использования forkJoin :

 public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
    const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
        (created) => {
            return forkJoin([
                of(created),
                forkJoin(groupIds.map(groupId => {
                    const membership = new Membership(created.Id, groupId);
                    return this.http.post<Membership>('/membership', membership);
                )
            ]);
        }
    )).pipe(map(a => a[0]));

    return response;
}

  

Есть ли способ вернуть созданного клиента, не дожидаясь завершения внутренних вызовов? За исключением этого, есть ли способ написать приведенный выше код более кратко?

(Примечание: this.http имеет тип HttpClient из Angular и возвращает Observable<T> )

Ответ №1:

Если вам не нужно использовать значение из параллельных вызовов (как в случае вашего текущего кода):

 public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
  const response = this.http.post <Customer>('/customer', customer)
    .pipe(
      tap(() => groupIds.map(groupId => {
          const membership = new Membership(created.Id, groupId);
          return this.http.post <Membership>('/membership', membership).subscribe();
        ))
      }
    ));

  return response;
}
  

Если вам действительно нужно использовать данные, вы можете использовать startWith для forkJoin

 public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
    const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
        (created) => {
            return forkJoin(groupIds.map(groupId => {
                    const membership = new Membership(created.Id, groupId);
                    return this.http.post<Membership>('/membership', membership);
            ).pipe(startWith(created);
        }
    ));

    return response;
}
  

Комментарии:

1. Я не использую значение из параллельных вызовов, но в вашем первом примере created.Id оно больше недействительно, поскольку вы нигде его не фиксируете.

2. При использовании tap в первом примере вместо оператора сопоставления более высокого порядка, что означает подписка на внутреннюю наблюдаемую?

3. Забыл удалить канал, ответ обновился. Я также включил пустую подписку, поскольку вас не волнуют данные из параллельных вызовов

Ответ №2:

Если у вас есть локальная переменная для клиента, вы могли бы сделать что-то вроде этого:

 public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
    const response = this.http.post<Customer>('/customer', customer).pipe(
      tap(created => this.customer = created), // <---
      mergeMap((created) => {
            return forkJoin([
                of(created),
                forkJoin(groupIds.map(groupId => {
                    const membership = new Membership(created.Id, groupId);
                    return this.http.post<Membership>('/membership', membership);
                )
            ])
        }
    )).pipe(map(a => a[0]));

    return response;
}
  

Добавление tap перед mergeMap устанавливает для клиента локальную переменную. Затем это дает вам доступ к созданному клиенту до завершения дочерних операций.

Я выполнил stackblitz здесь: https://stackblitz.com/edit/angular-create-todo-deborahk

(Он использует todo / posts вместо customers / groups, но он должен быть похож по концепции.)

Альтернативно

Вы могли бы сделать что-то вроде этого:

   private todoCreatedSubject = new BehaviorSubject<ToDo>(null);
  todoCreatedAction$ = this.todoCreatedSubject.asObservable();

  private groupSubject = new BehaviorSubject<number[]>([5, 10]);
  groupAction$ = this.groupSubject.asObservable();

  todo$ = this.todoCreatedAction$.pipe(
    mergeMap(todo =>
      this.http.post<ToDo>(this.todoUrl, todo, { headers: this.headers })
    )
  );

  posts$ = combineLatest([this.todo$, this.groupAction$]).pipe(
    switchMap(([created, groupIds]) =>
      forkJoin(
        groupIds.map(groupId => {
          const post = {
            userId: created.userId,
            title: "Post:"   groupId,
            body: created.title
          } as Post;
          return this.http
            .post<Post>(this.postUrl, post, { headers: this.headers })
            .pipe(tap(post => console.log(post)));
        })
      )
    )
  );
  

Он определяет два отдельных потока. todo$ будет похож на вашего клиента и будет доступен сразу после его публикации.

post$ было бы похоже на ваше членство.

Я также создал stackblitz для этого здесь: https://stackblitz.com/edit/angular-create-declarative-todo-deborahk

Ответ №3:

Я понимаю, что вы хотите создать наблюдаемый, который

  1. отправляет клиента, как только он извлекается («Мне также нужно вернуть значение из первого вызова, как только оно станет доступным«), но
  2. использует данные клиента для параллельного выполнения серии последующих HTTP-вызовов для сохранения данных о членстве и отправки после сохранения всех членств (т. Е. Когда возвращаются все параллельные http-вызовы)

Если я правильно понимаю, я бы поступил следующим образом.

Сначала я бы создал функцию, которая принимает customerId и массив groupIds и возвращает наблюдаемый, который реализует параллельные вызовы, используя forkJoin что-то вроде

 function createMemberships(customerId: string, groupIds: number[]) {
  return forkJoin(groupIds.map(gId => 
    createMembershipHttpSimulation(customerId, gId))
  )
}
  

Затем я бы создал вторую функцию, которая возвращает наблюдаемое, которое вы хотите, вот так.

 function create(customer: Customer, groupIds: number[]) {
  const createCustomerObs = createCustomerHttpSimulation(myCustomer).pipe(
      shareReplay(1)
  )
  const createMembershipsObs = createCustomerObs.pipe(
    concatMap(customer => createMemberships(customer.id, groupIds))
  )
  return concat(createCustomerObs, createMembershipsObs)
}
  

Эта функция сначала создает Observable createCustomerObs , который вызывает удаленный API для создания клиента и гарантирует, что его подписка является общедоступной shareReplay(1) . Затем создается второй наблюдаемый, createMembershipsObs , который подключается createCustomerObs к forkJoin требуемому для параллельного запуска запросов на создание членства. Наконец, он объединяет, через concat , createCustomerObs и createMembershipsObs . Результатом этой конкатенации является еще один наблюдаемый объект, который выдается сразу после createCustomerObs отправки, а затем отправляется при createMembershipsObs отправке, т. Е. При завершении параллельных вызовов. Использование shareReplay гарантирует, что API создания клиента вызывается только один раз.

Вот stackblitz, который показывает этот подход. Я использую некоторые задержки, чтобы показать на консоли, что сначала отправляется клиент, а затем приходят членства