rxjs: загрузить много для многих из 2 конечных точек и объединить в одну наблюдаемую

#rxjs #rxjs6 #rxjs-observables #rxjs-pipeable-operators

#rxjs #rxjs6 #rxjs-наблюдаемые #rxjs-pipeable-operators

Вопрос:

У меня есть одна конечная точка для получения списка пользователей, а другая для получения списка квартир для каждого пользователя:

 getUsers() => Observable<User[]>;
getUserApartments(userId) => Observable<Apartment[]>`
  

Как мне объединить данные из двух в одну наблюдаемую:

 const usersWithApartments$: Observable<{ user: User, apartments: Apartment[] }[]> = getUsers().pipe(
    // users.map(user => { user, apartments: getUserApartments(user.id) }) <-- turn this pseudocode into Rxjs
);
  

Ответ №1:

Поскольку getUsers() тип возвращаемого значения — Users[] я предполагаю, что желаемый тип — это коллекция { user: User, apartments: Apartment[]> , а не один.

Пожалуйста, рассмотрите приведенную ниже реализацию:

 class UserApartments {
  user: User;
  apartments: Apartment[];
}
  
 const usersWithApartments$: Observable<UserApartments[]> = getUsers()
  .pipe(
    switchMap(users => forkJoin(
      users.map(user => getUserApartments(user.id).pipe(
        map(apartments => ({ user, apartments }))
      ))
    ))
  );
  

Комментарии:

1. правильно! исправлено в вопросе. И спасибо за решение!

Ответ №2:

Это оно:

 const toUserWithApartmentsStream = (user: User) => getUserApartments(user.id).pipe(
  map((apartments: Apartment[]) => ({ user, apartments }))
);

const usersWithApartments$: Observable<{ user: User, apartments: Apartment[] }[]> =
  getUsers().pipe(
    switchMap((users: User[]) => users),
    concatMap((user: User) => toUserWithApartmentsStream(user)),
    toArray()
  );
  

Прямо сейчас getUserApartments будут выполняться последовательно один за другим, я мог бы написать mergeMap вместо concatMap и getUserApartments будут выполняться одновременно. Спасибо Рафи Хенигу за комментарий

Функция toUserWithApartmentsStream — это просто помощник, так что код выглядит чище. Я мог бы просто записать это встроенным внутри concatMap .

Комментарии:

1. Ваше решение страдает от недостатка параллелизма

2. @RafiHenig Что, если автор не хочет одновременно отправлять множество запросов в api? Я бы не сказал, что этому ответу не хватает параллелизма, я бы сказал, что этот конкретный ответ будет выполнять запросы последовательно, чтобы избежать одновременной отправки слишком большого количества запросов и ухудшения ситуации. Что касается альтернативы, я мог бы написать mergeMap вместо switchMap , и мой ответ будет иметь параллелизм.

3. @Goga Koreli обычно требуется параллелизм, кроме того mergeMap , не гарантирует, что объединенная наблюдаемая будет выдавать ответы api в том же порядке, в каком поступают элементы из источника

4. Я согласен, обычно это так. В сценарии реального мира для этого конкретного примера, если есть много пользователей, это приведет к одновременной отправке большого количества сетевых вызовов, а если есть несколько аутентифицированных пользователей, выполняющих это одновременно, это будет еще больше. Таким образом, сильно ударяя по серверной части. Кстати, я согласен, что mergeMap не гарантирует порядок прибытия, но я не вижу причин упоминать об этом, когда результатом в обоих случаях является массив.