Объединить несколько наблюдаемых объектов с помощью разных действий / операций

#angular #rxjs #rxjs5 #reactive-extensions-js #angular2-observables

#angular #rxjs #rxjs5 #реактивные расширения-js #angular2-наблюдаемые

Вопрос:

Я создаю приложение Angular2, поэтому я привыкаю к наблюдаемым и реактивным расширениям в целом. Я использую TypeScript и rxjs.

Теперь у меня есть наблюдаемый или, если хотите, поток массива некоторых объектов. Допустим, человек-объекты. Теперь у меня есть два других потока объектов-пользователей, и я хочу объединить их, чтобы я получал поток, который всегда обновляется:

 var people$ = getPeople();                  // Observable<Person[]>
var personAdded$ = eventHub.personAdded;    // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);
 

Если поток людей выдает массив, скажем, из 5 человек, и после этого personAdded-stream отправляет person, поток allPeople выдаст массив из 6.
Если personRemoved-stream выдает person, allPeople-stream должен выдавать массив объектов Person без того, который только что был выдан personRemoved-stream .

Есть ли встроенный в rxjs способ добиться такого поведения?

Ответ №1:

Я предлагаю, чтобы вы перенесли идею action в поток, который затем можно объединить и применить непосредственно к Array .

Первым шагом является определение некоторых функций, которые описывают ваши действия:

 function add(people, person) {
  return people.concat([people]);
}

function remove(people, person) {
  const index = people.indexOf(person);
  return index < 0 ? people : people.splice(index, 1);
}
 

Примечание: мы избегаем изменения массива на месте, поскольку это может иметь непредвиденные побочные эффекты. Чистота требует, чтобы вместо этого мы создали копию массива.

Теперь мы можем использовать эти функции и переносить их в поток, чтобы создать Observable объект, который генерирует функции:

 const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));
 

Теперь мы получаем события в виде: people => people где вводом и выводом будет массив людей (в этом примере упрощенный до простого массива строк).

Теперь, как бы мы это подключили? Ну, мы действительно заботимся только о добавлении или удалении этих событий после того, как у нас есть массив для их применения:

 const currentPeople = 

  // Resets this stream if a new set of people comes in
  people$.switchMap(peopleArray => 

    // Merge the actions together 
    Rx.Observable.merge(added$, removed$)

      // Pass in the starting Array and apply each action as it comes in
      .scan((current, op) => op(current), peopleArray)

      // Always emit the starting array first
      .startWith(people)
  )
  // This just makes sure that every new subscription doesn't restart the stream
  // and every subscriber always gets the latest value
  .shareReplay(1);
 

Существует несколько вариантов оптимизации этого метода в зависимости от ваших потребностей (например, избегание каррирования функции или использование двоичного поиска), но я нахожу приведенное выше относительно элегантным для общего случая.

Комментарии:

1. Хорошее объяснение, я собираюсь попробовать это, но я понимаю, что вы говорите! Я бы подумал, что это то, что люди хотят делать чаще, не так ли?

Ответ №2:

Вы хотите объединить все потоки (в стиле охотников за привидениями), а затем использовать оператор сканирования для определения состояния. Оператор сканирования работает как Javascript reduce.

Вот демонстрация…

 const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];

const initialPeople$ = Rx.Observable.from(initialPeople);

const addPeople = ['Person 5', 'Person 6', 'Person 7'];

const addPeople$ = Rx.Observable.from(addPeople)
            .concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async

const removePeople = ['Person 2x', 'Person 4x'];

const removePeople$ = Rx.Observable.from(removePeople)
                                              .delay(5000)
                                                .concatMap(x => Rx.Observable.of(x).delay(1000));

const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)

mergedStream$
  .scan((acc, stream) => {
        if (stream.includes('x') amp;amp; acc.length > 0) {
            const index = acc.findIndex(person => person === stream.replace('x', ''))
            acc.splice(index, 1);
        } else {
            acc.push(stream);
        }
      return acc;
  }, [])
  .subscribe(x => console.log(x))

// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]
 

http://jsbin.com/rozetoy/edit?js ,консоль

Вы не упомянули структуру своих данных. Мое использование «x» в качестве флага немного (много) неуклюжее и проблематичное. Но я думаю, вы видите, как вы можете изменить оператор сканирования в соответствии с вашими данными.