#typescript #rxjs
#typescript #rxjs
Вопрос:
Используя rxjs, я хотел бы выполнить группу http-вызовов со значением, возвращенным из предыдущего вызова. Я бы хотел, чтобы внутренние вызовы выполнялись параллельно. Мне также нужно вернуть значение из первого вызова, как только оно станет доступным. Мне нужно иметь возможность подписаться на результат и обрабатывать любые ошибки. Ошибки, вызванные внутренними вызовами, не должны приводить к отмене родственных вызовов.
У меня есть следующий код, который работает, но ожидает возврата начального значения до завершения внутренних вызовов из-за использования forkJoin
:
public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
(created) => {
return forkJoin([
of(created),
forkJoin(groupIds.map(groupId => {
const membership = new Membership(created.Id, groupId);
return this.http.post<Membership>('/membership', membership);
)
]);
}
)).pipe(map(a => a[0]));
return response;
}
Есть ли способ вернуть созданного клиента, не дожидаясь завершения внутренних вызовов? За исключением этого, есть ли способ написать приведенный выше код более кратко?
(Примечание: this.http
имеет тип HttpClient
из Angular и возвращает Observable<T>
)
Ответ №1:
Если вам не нужно использовать значение из параллельных вызовов (как в случае вашего текущего кода):
public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
const response = this.http.post <Customer>('/customer', customer)
.pipe(
tap(() => groupIds.map(groupId => {
const membership = new Membership(created.Id, groupId);
return this.http.post <Membership>('/membership', membership).subscribe();
))
}
));
return response;
}
Если вам действительно нужно использовать данные, вы можете использовать startWith
для forkJoin
public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
const response = this.http.post<Customer>('/customer', customer).pipe(mergeMap(
(created) => {
return forkJoin(groupIds.map(groupId => {
const membership = new Membership(created.Id, groupId);
return this.http.post<Membership>('/membership', membership);
).pipe(startWith(created);
}
));
return response;
}
Комментарии:
1. Я не использую значение из параллельных вызовов, но в вашем первом примере
created.Id
оно больше недействительно, поскольку вы нигде его не фиксируете.2. При использовании tap в первом примере вместо оператора сопоставления более высокого порядка, что означает подписка на внутреннюю наблюдаемую?
3. Забыл удалить канал, ответ обновился. Я также включил пустую подписку, поскольку вас не волнуют данные из параллельных вызовов
Ответ №2:
Если у вас есть локальная переменная для клиента, вы могли бы сделать что-то вроде этого:
public create(customer: Customer, groupIds: number[]): Observerable<Customer> {
const response = this.http.post<Customer>('/customer', customer).pipe(
tap(created => this.customer = created), // <---
mergeMap((created) => {
return forkJoin([
of(created),
forkJoin(groupIds.map(groupId => {
const membership = new Membership(created.Id, groupId);
return this.http.post<Membership>('/membership', membership);
)
])
}
)).pipe(map(a => a[0]));
return response;
}
Добавление tap перед mergeMap устанавливает для клиента локальную переменную. Затем это дает вам доступ к созданному клиенту до завершения дочерних операций.
Я выполнил stackblitz здесь: https://stackblitz.com/edit/angular-create-todo-deborahk
(Он использует todo / posts вместо customers / groups, но он должен быть похож по концепции.)
Альтернативно
Вы могли бы сделать что-то вроде этого:
private todoCreatedSubject = new BehaviorSubject<ToDo>(null);
todoCreatedAction$ = this.todoCreatedSubject.asObservable();
private groupSubject = new BehaviorSubject<number[]>([5, 10]);
groupAction$ = this.groupSubject.asObservable();
todo$ = this.todoCreatedAction$.pipe(
mergeMap(todo =>
this.http.post<ToDo>(this.todoUrl, todo, { headers: this.headers })
)
);
posts$ = combineLatest([this.todo$, this.groupAction$]).pipe(
switchMap(([created, groupIds]) =>
forkJoin(
groupIds.map(groupId => {
const post = {
userId: created.userId,
title: "Post:" groupId,
body: created.title
} as Post;
return this.http
.post<Post>(this.postUrl, post, { headers: this.headers })
.pipe(tap(post => console.log(post)));
})
)
)
);
Он определяет два отдельных потока. todo$
будет похож на вашего клиента и будет доступен сразу после его публикации.
post$
было бы похоже на ваше членство.
Я также создал stackblitz для этого здесь: https://stackblitz.com/edit/angular-create-declarative-todo-deborahk
Ответ №3:
Я понимаю, что вы хотите создать наблюдаемый, который
- отправляет клиента, как только он извлекается («Мне также нужно вернуть значение из первого вызова, как только оно станет доступным«), но
- использует данные клиента для параллельного выполнения серии последующих HTTP-вызовов для сохранения данных о членстве и отправки после сохранения всех членств (т. Е. Когда возвращаются все параллельные http-вызовы)
Если я правильно понимаю, я бы поступил следующим образом.
Сначала я бы создал функцию, которая принимает customerId
и массив groupIds
и возвращает наблюдаемый, который реализует параллельные вызовы, используя forkJoin
что-то вроде
function createMemberships(customerId: string, groupIds: number[]) {
return forkJoin(groupIds.map(gId =>
createMembershipHttpSimulation(customerId, gId))
)
}
Затем я бы создал вторую функцию, которая возвращает наблюдаемое, которое вы хотите, вот так.
function create(customer: Customer, groupIds: number[]) {
const createCustomerObs = createCustomerHttpSimulation(myCustomer).pipe(
shareReplay(1)
)
const createMembershipsObs = createCustomerObs.pipe(
concatMap(customer => createMemberships(customer.id, groupIds))
)
return concat(createCustomerObs, createMembershipsObs)
}
Эта функция сначала создает Observable createCustomerObs
, который вызывает удаленный API для создания клиента и гарантирует, что его подписка является общедоступной shareReplay(1)
. Затем создается второй наблюдаемый, createMembershipsObs
, который подключается createCustomerObs
к forkJoin
требуемому для параллельного запуска запросов на создание членства. Наконец, он объединяет, через concat
, createCustomerObs
и createMembershipsObs
. Результатом этой конкатенации является еще один наблюдаемый объект, который выдается сразу после createCustomerObs
отправки, а затем отправляется при createMembershipsObs
отправке, т. Е. При завершении параллельных вызовов. Использование shareReplay
гарантирует, что API создания клиента вызывается только один раз.
Вот stackblitz, который показывает этот подход. Я использую некоторые задержки, чтобы показать на консоли, что сначала отправляется клиент, а затем приходят членства