#rxjs
Вопрос:
Как правильно в RXJS переназначить наблюдаемое значение в начальное значение таймера, не прерывая исходный поток?
obs.pipe(take(1000), startTimer())
.subscribe(start => {
// show how long it took to finish streaming 1000 values:
const duration = Date.now() - start;
console.log(duration);
});
Я хочу startTimer
переназначить одноразовую подписку start
, но без прерывания исходного потока, т. Е. в этом случае subscribe
она должна запускаться только после завершения потоковой передачи всех 1000 значений.
Как мне это реализовать startTimer
? Предполагается, что это приведет к единовременному Date.now()
значению, которое поможет измерить продолжительность полного потока.
Или, может быть, для этого уже есть стандартное решение, которого мне не хватает?
обновление-1
Ожидаемый результат аналогичен приведенному ниже, но без необходимости создания start
в качестве внешней переменной, и вместо этого сделайте его частью потока:
const start = Date.now();
obs.pipe(take(1000))
.subscribe({
complete() {
const duration = Date.now() - start;
console.log(duration);
}
});
Причина, по которой я хочу сделать его частью потока, заключается в том, что исходные наблюдаемые и подписчики очень сильно отделены друг от друга, например, находятся в несвязанных исходных файлах.
P.S. В качестве альтернативы, решение, которое duration
в конце концов выдаст, также было бы хорошим, если это вообще возможно.
обновление-2
В конце концов, я использовал универсальный drain
операнд, предназначенный для слива наблюдаемого потока, а затем для создания наблюдаемого в конце:
/**
* Drains the source observable till it completes, and then posts a new value-observable.
*/
function drain<T>(value: T | Observable<T> | (() => T | Observable<T>)) {
const v = () => {
const a = typeof value === 'function' ? value.call(null) : value;
return a instanceof Observable ? a : of(a);
}
return s => defer(() => s.pipe(filter(_ => false), c => concat(c, v()))) as Observable<T>;
}
Используя этот операнд, я могу переписать startTimer
так:
const startTimer = () => drain(Date.now);
Ответ №1:
Какой-то код, который делает то, что вы описываете, в значительной степени точно так, как вы его описываете:
function logRunTime<T>(prefix: string): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const start = Date.now();
return s.pipe(
tap({
complete: () => console.log(`${prefix}: ${Date.now() - start}ms`)
})
);
});
}
interval(1000).pipe(
take(10),
logRunTime("Ten Seconds of Interval")
).subscribe(console.log);
Выход:
0
1
2
3
4
5
6
7
8
9
Ten Seconds of Interval: 10014ms
Обновление 1
не заставляйте исходные наблюдаемые значения прекращать излучать […] мы просто не хотим, чтобы исходные значения
Мне кажется, что либо вы продолжаете излучать ценности, либо нет.
Вот версия, которая уменьшает выбросы источника.
Это то, что тебе нужно?
function reduceRunTime<T>(prefix: string): OperatorFunction<T, string> {
return s => defer(() => {
const start = Date.now();
return s.pipe(
filter(_ => false),
c => concat(c, of(null)),
map(_ => `${prefix}: ${Date.now() - start}ms`)
);
}) as Observable<string>;
}
interval(1000).pipe(
take(10),
reduceRunTime("Ten Seconds of Interval")
).subscribe(console.log);
Выход:
Ten Seconds of Interval: 10013ms
Обновление 2
Если вам не нужна строка, она выдаст время начала, как только наблюдаемое завершится.
function startTimer() {
return s => s.pipe(
filter(_ => false),
c => concat(c, of(Date.now()))
) as Observable<number>;
}
Обновление 3
Два отдельных поведения
Я думаю, что обновление 2, возможно, было слишком сильно очищено. Рассмотрим этот пример:
const timed$ = interval(500).pipe(
take(5),
startTimer()
);
const logDiff = (start: number) => console.log(Date.now() - start);
timed$.subscribe(logDiff);
setTimeout(() => {
timed$.subscribe(logDiff);
}, 1000);
setTimeout(() => {
timed$.subscribe(logDiff);
}, 5000);
Вывод:
2521
3507
7511
Примечательно, что наблюдаемые объекты ленивы (ничего не делают, пока не подпишутся), но Date.now
вызываются при создании наблюдаемого объекта. Ваше время начала вполне может быть установлено задолго до того, как начнется наблюдаемое. Для того, чтобы сделать наблюдаемым 2,5 с, по-видимому, потребуется 7,5 с.
Использование отсрочки устраняет эту проблему, так как она не создает наблюдаемое, пока оно не будет подписано.
Обновленный startTimer
function startTimer() {
return s => defer(() => s.pipe(
filter(_ => false),
c => concat(c, of(Date.now()))
)) as Observable<number>;
}
Новый вывод, например, выше:
2521
2507
2511
Теперь вы можете делать забавные вещи, такие как запускать один и тот же наблюдаемый 10 раз и усреднять время выполнения, чтобы получить лучшее представление о том, сколько времени это займет.
const average = arr => arr.reduce( ( p, c ) => p c, 0 ) / arr.length;
concat(...Array.from(Array(10)).map(_ => timed$)).pipe(
map(start => Date.now() - start),
tap(console.log),
toArray()
).subscribe(runs => console.log("Average Runtime: ", average(runs)));
Выход:
2515
2506
2506
2506
2507
2505
2506
2506
2507
2507
Average Runtime: 2507.1
Комментарии:
1. Этот код выдает все исходные значения потока, но мне нужно только начальное значение таймера после завершения исходного потока. Конечно,
duration
ценность вместо этого также была бы хорошей.2. Тогда что вы подразумеваете под «не прерывая исходный поток»?
3.
without interrupting the original stream
= не делайте так, чтобы исходные наблюдаемые значения перестали излучать, потому что мы хотим, чтобы нашиsubscribe
, когда все значения были излучены, мы просто не хотим, чтобы исходные значения в нашем случае, мы хотим только илиstart
илиduration
.4. @vitaly-t — Я думаю, что в очищенной версии есть ошибка, которая просто выдает время начала. Я продемонстрировал, что я имею в виду, в другом обновлении выше. Это легко исправить.
5. @vitaly-t Я не вижу никаких проблем 🙂