Поведение Observable.Подписаться при использовании с несколькими IEnumerables

#c# #system.reactive

#c# #system.reactive

Вопрос:

Я пытаюсь создать IObservable<T> из двух массивов IEnumerable . Я пытаюсь избежать явного перебора массивов и вызова observer.OnNext . Я наткнулся на Observable.Метод расширения подписки, который на первый взгляд может показаться тем, что мне нужно. Однако это работает не так, как я ожидал, и я не понимаю, почему.

Следующий код является примером:

   class Program
  {
    static void Main(string[] args)
    {
      var observable = Observable.Create<char>(observer =>
        {
          var firstBytes = new[] {'A'};
          var secondBytes = new[] {'Z', 'Y'};
          firstBytes.Subscribe(observer);
          secondBytes.Subscribe(observer);

          return Disposable.Empty;
        }
      );

      observable.Subscribe(b => Console.Write(b));
    }
  }
 

Результатом этого является «AZ», а не «AZY», как я ожидал. Теперь, если я подпишусь на secondBytes before firstBytes , результат будет «ЗАЙ»! Это, по-видимому, предполагает, что он поэтапно перечисляет два массива, что отчасти объясняет вывод «AZ».

Во всяком случае, я в полной растерянности относительно того, почему он ведет себя так, и был бы признателен за любую информацию, которую люди могут предоставить.

Комментарии:

1. Все еще пытаюсь понять, почему это ведет себя так, как оно есть, но на случай, если вам просто нужно исправить : return firstBytes.Concat(secondBytes).Subscribe(observer); .

2. Просто небольшая подсказка — если когда-нибудь вы обнаружите, что возвращаетесь Disposable.Empty , значит, вы делаете что-то не так.

3. @Enigmativity — спасибо за совет — в данном случае это скорее побочный продукт простоты примера (или есть более подходящий возврат).

4. @cristobalito — Да, вы бы вернулись new CompositeDisposable(firstBytes.Subscribe(observer), secondBytes.Subscribe(observer)) .

5. @Enigmativity хороший — спасибо

Ответ №1:

Причина поведения итерации с шагом блокировки может быть объяснена реализацией Observable .Subscribe(источник IEnumerable), который использует «рекурсивный» алгоритм, который работает путем вызова e.MoveNext действия планировщика. Если это успешно, то значение передается, и затем новое действие планировщика ставится в очередь для чтения следующего значения из перечисляемого.

Поскольку вы подписываетесь на две перечислимые и не указываете какой-либо конкретный планировщик для подписки, для этих операций будет использоваться планировщик итераций по умолчанию (определенный SchedulerDefaults.Iteration ), который по умолчанию выполняется в текущем потоке. Это означает, что действия перечисления будут поставлены в очередь для запуска после завершения текущего действия подписки. Это приводит к чередованию действий перечисления — что-то вроде этого

  1. Первые байты.Subscribe() -> действие перечисления очереди
  2. Вторые байты.Subscribe() -> действие перечисления очереди
  3. вызовите firstBytes .MoveNext() -> onNext(«A») -> очередь следующего действия перечисления
  4. вызовите secondBytes .MoveNext() -> onNext(«Z») -> очередь следующего действия перечисления
  5. вызовите firstBytes .MoveNext() -> OnCompleted()
  6. вызовите secondBytes .MoveNext() -> onNext(Y) -> очередное действие перечисления
  7. вызовите secondBytes .MoveNext() -> OnCompleted()

Наблюдатель получает уведомление OnCompleted() на шаге 5, поэтому оставшиеся шаги перечисления secondBytes игнорируются. Если бы вы вернули свои одноразовые подписки, тогда второе перечисление было бы отменено в этот момент.

Комментарии:

1. Как представляется, это было пропущено в приведенном ниже ответе, но я думаю, что ваш вопрос подразумевает это, все, что вам нужно firstBytes.Concat(secondBytes) . Хотя это просто даст вам IEnumerable<char> , вы все равно можете просто подписаться на IEnumerables. Если вы действительно хотите, вы можете нажать на конец. .ToObservable() Но Observable.Create в OP нет необходимости.

Ответ №2:

Поскольку вы подписываетесь на две наблюдаемые, в отличие от одной наблюдаемой, которая является объединением двух наблюдаемых, существует два возможных источника, которые могут вызывать метод наблюдателя OnComplete . Поскольку первый массив короче, он завершается после отправки первого элемента, и наблюдатель отписывается, поскольку он получил уведомление о завершении.

Правильный способ сделать это — объединить две последовательности в одну последовательность, а затем подписаться на нее:

 var observable = Observable.Create<char>(observer =>
{
    var firstBytes = new[] { 'A' };
    var secondBytes = new[] { 'Z', 'Y' };

    return firstBytes.Concat(secondBytes).Subscribe(observer);
});

observable.Subscribe(Console.Write);
 

Комментарии:

1. Имеет смысл — мог бы поклясться, что я попробовал подход Concat (в моем более сложном случае использования). Спасибо за помощь.

2. Хотя — это Concat IEnumerable.Concat . Чтобы оставаться на IObservable земле, вероятно, должно быть firstBytes.ToObservable().Concat(secondBytes.ToObservable())

3. @cristobalito На самом деле это не имеет никакого функционального значения, по крайней мере, в этом случае.

4. ммм, я бы не сказал «правильный способ сделать это», весь пример в OP является поддельным и является Rx ради Rx. Было бы здорово узнать, почему OP хочет использовать Rx, когда источником является IEnumerable<T> ?

5. Привет, Ли — согласен, пример фиктивный, но это был упрощенный пример по сравнению с проблемой, которую я пытался решить. В любом случае, я думаю, что вы правы, и попытка перенести решение в Rx — это не путь вперед.