#dart
#dart
Вопрос:
Я хочу прослушивать события из нескольких Stream
источников, пока не получу событие остановки. После этого я хотел бы отказаться от подписки. Я ожидаю, что takeWhile
это отменит подписку, но, похоже, она не работает, пока не завершится ожидание Future
.
Вот мой код ниже:
void main() async {
await StreamGroup.merge([_test2(), _test1()])
.takeWhile((element) => element != -1)
.forEach((element) {
print('Element=$element');
});
print('Finished!');
}
Stream<int> _test1() async* {
for (var i = 0; i < 5; i ) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
yield -1;
}
Stream<int> _test2() async* {
await longUserAction();
for (var i = 10; i < 20; i ) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
yield -1;
}
Future<void> longUserAction() => Future.delayed(Duration(seconds: 20));
Что я, кроме:
Element=0
Element=1
Element=2
Element=3
Element=4
Finished!
Что я получил:
Element=0
Element=1
Element=2
Element=3
Element=4
*long pause*
Finished!
Комментарии:
1. Вероятно, потому, что у вас есть то
Future
, что ждет 20 секунд.2. @ChristopherMoore, но я не хочу ждать 20 секунд. Я хочу отменить, как только
takeWhile
предикат удовлетворит
Ответ №1:
Вот решение для моего случая:
class _SubscriptionData<T> {
final Stream<T> source;
final StreamSubscription<T> subscription;
bool isClosed = false;
_SubscriptionData(this.source, this.subscription);
void cancelSubscription() {
if (!isClosed) {
isClosed = true;
subscription.cancel();
}
}
}
class _MergeStreamController<T> {
final StreamController<T> _controller = StreamController<T>();
int workingStreamsCount = 0;
_MergeStreamController(List<Stream<T>> sources, bool Function(T) predicate) {
workingStreamsCount = sources.length;
final List<_SubscriptionData<T>> subscriptions = sources
.map((source) => _SubscriptionData(source, source.listen(null)))
.toList(growable: false);
void cancelAll() {
subscriptions.forEach((sub) {
sub.cancelSubscription();
});
}
subscriptions.forEach((subsData) {
subsData.subscription.onData((data) {
if (!predicate(data)) {
workingStreamsCount = 0;
_controller.close();
cancelAll();
} else if (!_controller.isClosed) _controller.add(data);
});
subsData.subscription.onDone(() {
if (--workingStreamsCount <= 0) _controller.close();
subsData.cancelSubscription();
});
});
}
}
/// Merges [sources] streams into a single stream channel
/// Stream closes when the first [source] stream emits event which is not satisfying predicate
/// or all streams done its work.
Stream<T> mergeStreamsWhile<T>(
List<Stream<T>> sources, bool Function(T) takeWhile) =>
_MergeStreamController(sources, takeWhile)._controller.stream;
void main() async {
await mergeStreamsWhile(
[_test2(), _test1(), _test3()], (element) => element != -1)
.forEach((element) {
print('Element=$element');
});
print('Finished!');
}
Stream<int> _test1() async* {
for (var i = 0; i < 5; i ) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
yield -1;
}
Stream<int> _test2() async* {
await longUserAction();
for (var i = 10; i < 20; i ) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
yield -1;
}
Stream<int> _test3() async* {
return; // Simulate an empty stream
}
Future<void> longUserAction() => Future.delayed(Duration(seconds: 20));
Вывод:
Element=0
Element=1
Element=2
Element=3
Element=4
Finished!