#angular #rxjs #rxjs5 #reactive-extensions-js #angular2-observables
#angular #rxjs #rxjs5 #реактивные расширения-js #angular2-наблюдаемые
Вопрос:
Я создаю приложение Angular2, поэтому я привыкаю к наблюдаемым и реактивным расширениям в целом. Я использую TypeScript и rxjs.
Теперь у меня есть наблюдаемый или, если хотите, поток массива некоторых объектов. Допустим, человек-объекты. Теперь у меня есть два других потока объектов-пользователей, и я хочу объединить их, чтобы я получал поток, который всегда обновляется:
var people$ = getPeople(); // Observable<Person[]>
var personAdded$ = eventHub.personAdded; // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;
var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);
Если поток людей выдает массив, скажем, из 5 человек, и после этого personAdded-stream отправляет person, поток allPeople выдаст массив из 6.
Если personRemoved-stream выдает person, allPeople-stream должен выдавать массив объектов Person без того, который только что был выдан personRemoved-stream .
Есть ли встроенный в rxjs способ добиться такого поведения?
Ответ №1:
Я предлагаю, чтобы вы перенесли идею action
в поток, который затем можно объединить и применить непосредственно к Array
.
Первым шагом является определение некоторых функций, которые описывают ваши действия:
function add(people, person) {
return people.concat([people]);
}
function remove(people, person) {
const index = people.indexOf(person);
return index < 0 ? people : people.splice(index, 1);
}
Примечание: мы избегаем изменения массива на месте, поскольку это может иметь непредвиденные побочные эффекты. Чистота требует, чтобы вместо этого мы создали копию массива.
Теперь мы можем использовать эти функции и переносить их в поток, чтобы создать Observable
объект, который генерирует функции:
const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));
Теперь мы получаем события в виде: people => people
где вводом и выводом будет массив людей (в этом примере упрощенный до простого массива строк).
Теперь, как бы мы это подключили? Ну, мы действительно заботимся только о добавлении или удалении этих событий после того, как у нас есть массив для их применения:
const currentPeople =
// Resets this stream if a new set of people comes in
people$.switchMap(peopleArray =>
// Merge the actions together
Rx.Observable.merge(added$, removed$)
// Pass in the starting Array and apply each action as it comes in
.scan((current, op) => op(current), peopleArray)
// Always emit the starting array first
.startWith(people)
)
// This just makes sure that every new subscription doesn't restart the stream
// and every subscriber always gets the latest value
.shareReplay(1);
Существует несколько вариантов оптимизации этого метода в зависимости от ваших потребностей (например, избегание каррирования функции или использование двоичного поиска), но я нахожу приведенное выше относительно элегантным для общего случая.
Комментарии:
1. Хорошее объяснение, я собираюсь попробовать это, но я понимаю, что вы говорите! Я бы подумал, что это то, что люди хотят делать чаще, не так ли?
Ответ №2:
Вы хотите объединить все потоки (в стиле охотников за привидениями), а затем использовать оператор сканирования для определения состояния. Оператор сканирования работает как Javascript reduce.
Вот демонстрация…
const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];
const initialPeople$ = Rx.Observable.from(initialPeople);
const addPeople = ['Person 5', 'Person 6', 'Person 7'];
const addPeople$ = Rx.Observable.from(addPeople)
.concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async
const removePeople = ['Person 2x', 'Person 4x'];
const removePeople$ = Rx.Observable.from(removePeople)
.delay(5000)
.concatMap(x => Rx.Observable.of(x).delay(1000));
const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)
mergedStream$
.scan((acc, stream) => {
if (stream.includes('x') amp;amp; acc.length > 0) {
const index = acc.findIndex(person => person === stream.replace('x', ''))
acc.splice(index, 1);
} else {
acc.push(stream);
}
return acc;
}, [])
.subscribe(x => console.log(x))
// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]
http://jsbin.com/rozetoy/edit?js ,консоль
Вы не упомянули структуру своих данных. Мое использование «x» в качестве флага немного (много) неуклюжее и проблематичное. Но я думаю, вы видите, как вы можете изменить оператор сканирования в соответствии с вашими данными.