#c# #system.reactive
#c# #system.reactive
Вопрос:
Я пытаюсь создать IObservable<T>
из двух массивов IEnumerable
. Я пытаюсь избежать явного перебора массивов и вызова observer.OnNext
. Я наткнулся на Observable.Метод расширения подписки, который на первый взгляд может показаться тем, что мне нужно. Однако это работает не так, как я ожидал, и я не понимаю, почему.
Следующий код является примером:
class Program
{
static void Main(string[] args)
{
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] {'A'};
var secondBytes = new[] {'Z', 'Y'};
firstBytes.Subscribe(observer);
secondBytes.Subscribe(observer);
return Disposable.Empty;
}
);
observable.Subscribe(b => Console.Write(b));
}
}
Результатом этого является «AZ», а не «AZY», как я ожидал. Теперь, если я подпишусь на secondBytes
before firstBytes
, результат будет «ЗАЙ»! Это, по-видимому, предполагает, что он поэтапно перечисляет два массива, что отчасти объясняет вывод «AZ».
Во всяком случае, я в полной растерянности относительно того, почему он ведет себя так, и был бы признателен за любую информацию, которую люди могут предоставить.
Комментарии:
1. Все еще пытаюсь понять, почему это ведет себя так, как оно есть, но на случай, если вам просто нужно исправить :
return firstBytes.Concat(secondBytes).Subscribe(observer);
.2. Просто небольшая подсказка — если когда-нибудь вы обнаружите, что возвращаетесь
Disposable.Empty
, значит, вы делаете что-то не так.3. @Enigmativity — спасибо за совет — в данном случае это скорее побочный продукт простоты примера (или есть более подходящий возврат).
4. @cristobalito — Да, вы бы вернулись
new CompositeDisposable(firstBytes.Subscribe(observer), secondBytes.Subscribe(observer))
.5. @Enigmativity хороший — спасибо
Ответ №1:
Причина поведения итерации с шагом блокировки может быть объяснена реализацией Observable .Subscribe(источник IEnumerable), который использует «рекурсивный» алгоритм, который работает путем вызова e.MoveNext
действия планировщика. Если это успешно, то значение передается, и затем новое действие планировщика ставится в очередь для чтения следующего значения из перечисляемого.
Поскольку вы подписываетесь на две перечислимые и не указываете какой-либо конкретный планировщик для подписки, для этих операций будет использоваться планировщик итераций по умолчанию (определенный SchedulerDefaults.Iteration
), который по умолчанию выполняется в текущем потоке. Это означает, что действия перечисления будут поставлены в очередь для запуска после завершения текущего действия подписки. Это приводит к чередованию действий перечисления — что-то вроде этого
- Первые байты.Subscribe() -> действие перечисления очереди
- Вторые байты.Subscribe() -> действие перечисления очереди
- вызовите firstBytes .MoveNext() -> onNext(«A») -> очередь следующего действия перечисления
- вызовите secondBytes .MoveNext() -> onNext(«Z») -> очередь следующего действия перечисления
- вызовите firstBytes .MoveNext() -> OnCompleted()
- вызовите secondBytes .MoveNext() -> onNext(Y) -> очередное действие перечисления
- вызовите secondBytes .MoveNext() -> OnCompleted()
Наблюдатель получает уведомление OnCompleted() на шаге 5, поэтому оставшиеся шаги перечисления secondBytes игнорируются. Если бы вы вернули свои одноразовые подписки, тогда второе перечисление было бы отменено в этот момент.
Комментарии:
1. Как представляется, это было пропущено в приведенном ниже ответе, но я думаю, что ваш вопрос подразумевает это, все, что вам нужно
firstBytes.Concat(secondBytes)
. Хотя это просто даст вамIEnumerable<char>
, вы все равно можете просто подписаться на IEnumerables. Если вы действительно хотите, вы можете нажать на конец..ToObservable()
НоObservable.Create
в OP нет необходимости.
Ответ №2:
Поскольку вы подписываетесь на две наблюдаемые, в отличие от одной наблюдаемой, которая является объединением двух наблюдаемых, существует два возможных источника, которые могут вызывать метод наблюдателя OnComplete
. Поскольку первый массив короче, он завершается после отправки первого элемента, и наблюдатель отписывается, поскольку он получил уведомление о завершении.
Правильный способ сделать это — объединить две последовательности в одну последовательность, а затем подписаться на нее:
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] { 'A' };
var secondBytes = new[] { 'Z', 'Y' };
return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);
Комментарии:
1. Имеет смысл — мог бы поклясться, что я попробовал подход Concat (в моем более сложном случае использования). Спасибо за помощь.
2. Хотя — это
Concat
IEnumerable.Concat
. Чтобы оставаться наIObservable
земле, вероятно, должно бытьfirstBytes.ToObservable().Concat(secondBytes.ToObservable())
3. @cristobalito На самом деле это не имеет никакого функционального значения, по крайней мере, в этом случае.
4. ммм, я бы не сказал «правильный способ сделать это», весь пример в OP является поддельным и является Rx ради Rx. Было бы здорово узнать, почему OP хочет использовать Rx, когда источником является IEnumerable<T> ?
5. Привет, Ли — согласен, пример фиктивный, но это был упрощенный пример по сравнению с проблемой, которую я пытался решить. В любом случае, я думаю, что вы правы, и попытка перенести решение в Rx — это не путь вперед.