#c# #.net #linq #system.reactive
#c# #.net #linq #system.reactive
Вопрос:
Может кто-нибудь, пожалуйста, помочь объяснить, почему, когда я «блокирую и продолжаю» последовательность onNext наблюдателя, подписанную на буфер с наблюдаемой во времени последовательностью, этот планировщик.newThread больше не применяется?
Например:
Если я буферизую последовательность чисел через
var query = from number in Enumerable.Range(1,200)
select SnoozeNumberProduction(number);
var observableQuery = query.ToObservable();
var bufferedSequence = observableQuery.Buffer(TimeSpan.FromSeconds(2));
Где SnoozeNumberProduction задерживает генерацию числа на 250 мс
static int SnoozeNumberProduction(Int32 number)
{
Thread.Sleep(250);
return number;
}
Теперь, позже, если я подпишусь на bufferedSequence с помощью «observeOn (планировщик.newThread)» так что я блокирую четвертый буфер с помощью консоли.ReadKey
Random random = new Random();
Int32 count = 0;
bufferedSequence.ObserveOn(Scheduler.NewThread).Subscribe(list =>
{
Console.WriteLine("({0}) Numbers from {1}-{2} produced on Thread ID {3}", list.Count, list[0], list[list.Count -1], Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
count ;
if (count == 4)
{
Console.WriteLine("count reached to 4, blocking ... press any key to continue ");
Console.ReadKey(); // Block and build up the queue
}
Console.WriteLine("Woken " list[0] " - " list[list.Count - 1]);
});
В этом случае, если я нажму любую клавишу, скажем, через 10 секунд или около того, я увижу, что следующие несколько буферов выполняются в том же управляемом потоке, даже когда планировщик.newThread упоминается в наблюдении. Может кто-нибудь, пожалуйста, помочь объяснить это поведение?
Пример вывода:
(7) Numbers from 1-7 produced on Thread ID 12
Woken 1 - 7
(9) Numbers from 8-16 produced on Thread ID 14
Woken 8 - 16
(8) Numbers from 17-24 produced on Thread ID 15
Woken 17 - 24
(8) Numbers from 25-32 produced on Thread ID 16
count reached to 4, blocking ... press any key to continue
Woken 25 - 32
(8) Numbers from 33-40 produced on Thread ID **16**
Woken 33 - 40
(8) Numbers from 41-48 produced on Thread ID **16**
Woken 41 - 48
(8) Numbers from 49-56 produced on Thread ID **16**
Woken 49 - 56
(8) Numbers from 57-64 produced on Thread ID **16**
Woken 57 - 64
(8) Numbers from 65-72 produced on Thread ID **16**
Woken 65 - 72
(8) Numbers from 73-80 produced on Thread ID **16**
Woken 73 - 80
(8) Numbers from 81-88 produced on Thread ID **16**
Woken 81 - 88
(8) Numbers from 89-96 produced on Thread ID **16**
Ответ №1:
ObserveOn
сам по себе является слоем в вашей составленной последовательности, единственная задача которого — переключиться на другой планировщик. Тем не менее, ваши отключения происходят в Select
происходящем на IEnumerable
. Затем эта последовательность преобразуется в IObservable
using ToObservable
, значение которого по умолчанию равно Dispatcher.CurrentThread
.
Только на этом этапе вы переключаетесь на другой поток для каждого поступающего элемента. Если вы измените его на:
var query = from number in Enumerable.Range(1,200).ToObservable(Dispatcher.NewThread)
select SnoozeNumberProduction(number);
var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
Теперь перечисление происходит в новом потоке, и, поскольку вы ничего не делаете, чтобы это изменить, оно останется там.
На самом деле есть, Observable.Range
который начинается как IObservable
и принимает необязательный IDispatcher
. Однако я предположил, что ваш источник на самом деле не является Enumerable.Range
. В случае, если это так, вот эквивалент:
var query = from number in Observable.Range(1,200, Dispatcher.NewThread)
select SnoozeNumberProduction(number);
var bufferedSequence = query.Buffer(TimeSpan.FromSeconds(2));
Ответ №2:
Я перекрестно разместил этот вопрос на форуме MSDN Rxhttp://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e72a11-9841-4571-b86d-f805d3aeb7b5 и узнал, что это из соображений эффективности
Вы блокируете вызов onNext в Subscribe. Оператор observeOn гарантирует, что onNext будет вызываться как можно больше раз в текущем потоке.Оператор observeOn повторно использует текущий поток для последовательного вызова onNext для такого количества значений, которое доступно в данный момент. Поскольку вы блокируете в Subscribe, будет накапливаться несколько вызовов onNext. После разблокировки вызовы в очереди выполняются в том же потоке. Я считаю, что это делается для того, чтобы избежать накладных расходов на создание нового потока для каждого уведомления, когда в этом нет необходимости.