Как сделать несколько подписок на один порт приема dart

#flutter #asynchronous #stream #dart-isolates

Вопрос:

У меня есть задача создания N — изолятов и одного основного порта приема, который будет передан этим изолятам, и с помощью этого порта приема я передаю всю информацию, необходимую функции, чтобы дать мне желаемый результат. Это работает в первый раз, однако, любое последовательное сообщение для этих изолятов, и их получение приводит к такой ошибке

Это мой код

   //main receive port
  var controlPort = ReceivePort();

  ...
  class _MainScreenState extends State<MainScreen> {
  final ImagePicker _picker = ImagePicker();

  Future<ui.Image>? image;

  @override
  void initState() {
    for (int i = 0; i < Platform.numberOfProcessors; i  ) {
      Future<Isolate> isolate =
          Isolate.spawn(Core.readAndMapFast, controlPort.sendPort);
    }
  }

  @override
  Widget build(BuildContext context) {

  ...
  static void readAndMapFast(SendPort sendPort) async {
    //sending SendPort of a created isolate to the main thread
    ReceivePort receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);

    receivePort.first.then((message) {
    //listening for a message coming from main thread and populating it with all needed 
    //data
      message as Arguments;
      Arguments arguments = Arguments(message.imgList, message.lutList,
          message.imgHeight, message.imgWidth, message.widthLut, message.order);
    //ecexuting all needed operations and after that sending message to the main thread
    ...
      sendPort.send(output);

    //in the main thread we are listening for messages from RecievePort we created at 
    //the very beginning

   await for (dynamic message in controlPort.asBroadcastStream()) {
      if (message is SendPort) {

   //waiting for a message from isolates and if its type is SendPort i.e isolate sending 
   //its first message we are sending needed arguments to a function

        message.send(args[i]);
      } else {
        message as FinalList;
        counter  ;
        postEditList.add(message);
        if (counter == numberOfWorkers) {
          Stopwatch timerr = Stopwatch();
          timerr.start();
          for (int t = 0; t < postEditList.length; t  ) {
            FinalList list =
                postEditList.where((element) => element.order == t).first;
            bytesBuilder.add(list.imgSubList);
          }
          break;
        }
      }
    }
 

Я предполагаю, что я должен использовать ReceivePort().asBroadcastStream (), однако я не могу отправлять сообщения и делать с ним другие вещи. Потому что после того, как он станет потоковым, у нас не будет SendPort.

Ответ №1:

Я нашел решение. Это довольно просто. У нас есть один основной порт приема, который мы будем использовать для связи между изолятами и основным потоком, а также мы создаем широковещательный поток, который будет использоваться для прослушивания любых обновлений из нашего порта приема.

 var controlPort = ReceivePort();
var broadcastStream = controlPort.asBroadcastStream();
 

Тогда все, что вам нужно сделать, это прослушать обновления broadcastStream, а не ControlPort с помощью

 broadcastStream.listen((event) {});
 

Я надеюсь, что это поможет тем, кто застрял на той же проблеме