Фильтрация RxJS по нескольким потокам

#javascript #angular #rxjs #observable #angularfire2

#javascript #угловой #rxjs #наблюдаемый #angularfire2

Вопрос:

На самом деле я застрял в этой проблеме.

У меня есть поток для событий с использованием Angularfire2.

 this.events$ = this.af.database.list('/events')
    .map(events => {
    const filtered = events.filter(event => event.title === "Title 1");
    return filtered;
    });  

который извлекает мне данные следующим образом:

 {
  title: "Title 1",
  userid: "1"  
}  

Затем у меня есть второй поток для пользователя, подобный этому:

 this.users$ = this.af.database.list('/users/1')
    .map(users => {
    const filtered = users.filter(user => user.name === "Name 1");
        return filtered;
    });  

Теперь я хочу объединить потоки, чтобы получить следующий результат:

Верните мне все события, где event.title === "Title 1" и user.name === "Name 1"

Как это возможно?

Заранее спасибо!

Ответ №1:

Если вы хотите объединить две наблюдаемые, используйте merge() :

Смотрите живую демонстрацию: http://plnkr.co/edit/7j71Qzok7CoFVt49QBs4

 import {Observable, Subject} from 'rxjs';

let stream1 = new Subject();
let stream2 = new Subject();

Observable.merge(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.next(43);
stream2.next(42);
stream1.next(41);
  

Выводит на консоль:

 43
42
41
  

Если вы хотите выдать одно значение после завершения всех наблюдаемых, используйте forkJoin() :

 import {Observable, Subject} from 'rxjs';

let stream1 = new Subject();
let stream2 = new Subject();

Observable.forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.next(43);
stream2.next(42);
stream1.next(41);

stream1.complete();
stream2.complete();
  

Выводит на консоль:

 [41, 42]
  

Комментарии:

1. Спасибо за ответ

Ответ №2:

Этот ответ устарел! К сожалению, я не могу его удалить.

Что-то вроде этого работает для меня:

 this.events$ = this.af.database.list('/events')
      .map(events => {
        events.map(event => {
            this.af.database.object('users/'   event.userid)

            .subscribe(user => {
                event.user = user;
              })
            })
             
            const filtered = events.filter(event => event.title === "Test 1")
                .filter(event => event.user.name === "Name 1");;
                return filtered;
        });
  

Я подписываюсь на пользователей внутри моего наблюдаемого события. После этого я сохраняю пользователя в свойстве моего события.

Для меня это нормально!

Обратите внимание: если вы хотите получать обновления вашего пользователя в режиме реального времени, сохраните наблюдаемое в свойстве, а не в объекте user.

Комментарии:

1. Как только ваш вызов subscribe внутри наблюдаемого, вы ошиблись