Наблюдение с помощью Scheduler.newThread не выполняет so, если onNext наблюдателя заблокирован и продолжается

#c# #.net #linq #system.reactive

#c# #.net #linq #system.reactive

Вопрос:

Может кто-нибудь, пожалуйста, помочь объяснить, почему, когда я «блокирую и продолжаю» последовательность onNext наблюдателя, подписанную на буфер с наблюдаемой во времени последовательностью, этот планировщик.newThread больше не применяется?

Например:

Если я буферизую последовательность чисел через

 var query = from number in Enumerable.Range(1,200)
            select SnoozeNumberProduction(number);

var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
  

Где SnoozeNumberProduction задерживает генерацию числа на 250 мс

 static int SnoozeNumberProduction(Int32 number)
{
    Thread.Sleep(250);
    return number;
}
  

Теперь, позже, если я подпишусь на bufferedSequence с помощью «observeOn (планировщик.newThread)» так что я блокирую четвертый буфер с помощью консоли.ReadKey

 Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
    Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);

    Thread.Sleep(1000);
    count  ;
    if (count == 4)
    {
        Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
        Console.ReadKey(); // Block and build up the queue
    }

    Console.WriteLine("Woken "   list[0]   " - "   list[list.Count - 1]);
});
  

В этом случае, если я нажму любую клавишу, скажем, через 10 секунд или около того, я увижу, что следующие несколько буферов выполняются в том же управляемом потоке, даже когда планировщик.newThread упоминается в наблюдении. Может кто-нибудь, пожалуйста, помочь объяснить это поведение?

Пример вывода:

 (7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**
  

Ответ №1:

ObserveOn сам по себе является слоем в вашей составленной последовательности, единственная задача которого — переключиться на другой планировщик. Тем не менее, ваши отключения происходят в Select происходящем на IEnumerable . Затем эта последовательность преобразуется в IObservable using ToObservable , значение которого по умолчанию равно Dispatcher.CurrentThread .

Только на этом этапе вы переключаетесь на другой поток для каждого поступающего элемента. Если вы измените его на:

 var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
  

Теперь перечисление происходит в новом потоке, и, поскольку вы ничего не делаете, чтобы это изменить, оно останется там.

На самом деле есть, Observable.Range который начинается как IObservable и принимает необязательный IDispatcher . Однако я предположил, что ваш источник на самом деле не является Enumerable.Range . В случае, если это так, вот эквивалент:

 var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
            select SnoozeNumberProduction(number);

var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
  

Ответ №2:

Я перекрестно разместил этот вопрос на форуме MSDN Rxhttp://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 и узнал, что это из соображений эффективности


Вы блокируете вызов onNext в Subscribe. Оператор observeOn гарантирует, что onNext будет вызываться как можно больше раз в текущем потоке.Оператор observeOn повторно использует текущий поток для последовательного вызова onNext для такого количества значений, которое доступно в данный момент. Поскольку вы блокируете в Subscribe, будет накапливаться несколько вызовов onNext. После разблокировки вызовы в очереди выполняются в том же потоке. Я считаю, что это делается для того, чтобы избежать накладных расходов на создание нового потока для каждого уведомления, когда в этом нет необходимости.