Почему я не могу немедленно отменить подписку на StreamGroup

#dart

#dart

Вопрос:

Я хочу прослушивать события из нескольких Stream источников, пока не получу событие остановки. После этого я хотел бы отказаться от подписки. Я ожидаю, что takeWhile это отменит подписку, но, похоже, она не работает, пока не завершится ожидание Future .

Вот мой код ниже:

 
    void main() async {
      await StreamGroup.merge([_test2(), _test1()])
          .takeWhile((element) => element != -1)
          .forEach((element) {
        print('Element=$element');
      });
      print('Finished!');
    }
    
    Stream<int> _test1() async* {
      for (var i = 0; i < 5; i  ) {
        await Future.delayed(Duration(seconds: 1));
        yield i;
      }
      yield -1;
    }
    
    Stream<int> _test2() async* {
      await longUserAction();
    
      for (var i = 10; i < 20; i  ) {
        await Future.delayed(Duration(seconds: 1));
        yield i;
      }
    
      yield -1;
    }
    
    Future<void> longUserAction() => Future.delayed(Duration(seconds: 20));

  

Что я, кроме:

 Element=0
Element=1
Element=2
Element=3
Element=4
Finished!
  

Что я получил:

 Element=0
Element=1
Element=2
Element=3
Element=4
*long pause*
Finished!
  

Комментарии:

1. Вероятно, потому, что у вас есть то Future , что ждет 20 секунд.

2. @ChristopherMoore, но я не хочу ждать 20 секунд. Я хочу отменить, как только takeWhile предикат удовлетворит

Ответ №1:

Вот решение для моего случая:

 class _SubscriptionData<T> {
  final Stream<T> source;
  final StreamSubscription<T> subscription;
  bool isClosed = false;

  _SubscriptionData(this.source, this.subscription);

  void cancelSubscription() {
    if (!isClosed) {
      isClosed = true;
      subscription.cancel();
    }
  }
}

class _MergeStreamController<T> {
  final StreamController<T> _controller = StreamController<T>();
  int workingStreamsCount = 0;

  _MergeStreamController(List<Stream<T>> sources, bool Function(T) predicate) {
    workingStreamsCount = sources.length;

    final List<_SubscriptionData<T>> subscriptions = sources
        .map((source) => _SubscriptionData(source, source.listen(null)))
        .toList(growable: false);

    void cancelAll() {
      subscriptions.forEach((sub) {
        sub.cancelSubscription();
      });
    }

    subscriptions.forEach((subsData) {
      subsData.subscription.onData((data) {
        if (!predicate(data)) {
          workingStreamsCount = 0;
          _controller.close();
          cancelAll();
        } else if (!_controller.isClosed) _controller.add(data);
      });

      subsData.subscription.onDone(() {
        if (--workingStreamsCount <= 0) _controller.close();
        subsData.cancelSubscription();
      });
    });
  }
}

/// Merges [sources] streams into a single stream channel
/// Stream closes when the first [source] stream emits event which is not satisfying predicate
/// or all streams done its work.
Stream<T> mergeStreamsWhile<T>(
        List<Stream<T>> sources, bool Function(T) takeWhile) =>
    _MergeStreamController(sources, takeWhile)._controller.stream;
  
 void main() async {
  await mergeStreamsWhile(
          [_test2(), _test1(), _test3()], (element) => element != -1)
      .forEach((element) {
    print('Element=$element');
  });
  print('Finished!');
}

Stream<int> _test1() async* {
  for (var i = 0; i < 5; i  ) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
  yield -1;
}

Stream<int> _test2() async* {
  await longUserAction();

  for (var i = 10; i < 20; i  ) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }

  yield -1;
}

Stream<int> _test3() async* {
  return; // Simulate an empty stream
}

Future<void> longUserAction() => Future.delayed(Duration(seconds: 20));
  

Вывод:

 Element=0
Element=1
Element=2
Element=3
Element=4
Finished!