#c# #.net #system.reactive
#c# #.net #система.реактивная
Вопрос:
Я пытаюсь добиться функциональности, подобной очереди, с помощью Rx (я знаю, что могу выполнить это с помощью Queue Locking, но пытаюсь учиться использовать Rx, поскольку у меня не так много возможностей его использовать).
Функциональность заключается в том, что я хочу предпринять некоторые действия для выполнения события, но хочу обрабатывать только по одному за раз, а после завершения обрабатывать следующий / последний (если он новый).
Это то, что у меня есть на данный момент (используя наблюдаемый флаг увеличения, чтобы я мог это сделать DistinctUntilChanged
, но это похоже на взлом).
// Some source
var events = new Subject<string>();
events.Subscribe(s => Console.WriteLine("Event: " s));
// How can I get rid of this
var counter = 0;
var flag = new Subject<int>();
flag.Subscribe(i => Console.WriteLine("Flag: " i));
var combined = events
.CombineLatest(flag, (s, i) => new {Event = s, Flag = i});
var worker = combined
.DistinctUntilChanged(arg => arg.Flag)
.DistinctUntilChanged(arg => arg.Event);
worker.Subscribe(x => Console.WriteLine("rnDo Work: " x.Event "rn"));
flag.OnNext(counter ); // Ready
events.OnNext("one"); // Idle, Start eight
events.OnNext("two");
events.OnNext("three");
events.OnNext("four");
events.OnNext("five"); // Process
flag.OnNext(counter ); // Finished one, start five
events.OnNext("six");
events.OnNext("seven");
events.OnNext("eight"); // Idle, Start eight
flag.OnNext(counter ); // Finished five, start eight
flag.OnNext(counter ); // Finished eight
events.OnNext("nine"); // Should be processed
Если вы запустите этот код, вы увидите, что последнее событие не обрабатывается, даже если субъект бездействует.
Такое ощущение, что я здесь чего-то не хватает….
На данный момент я смотрю на использование наблюдаемых наблюдаемых каким-то образом …. но пытался разобраться в этом последние пару часов или около того 🙁
Редактировать
Удалось сделать это, изменив subject на ReplaySubject
var events = new ReplaySubject<string>(1);
и вводим еще одну переменную, но это похоже на еще один взлом, но это позволило мне избавиться от счетчика.
string lastSeen = null;
flag.Select(x => events.Where(s => s != lastSeen).Take(1))
.Subscribe(x =>
x.Subscribe(s =>
{
lastSeen = s;
Console.WriteLine(s);
})
);
Если кто-то знает лучший способ, которым я могу избавиться string lastSeen
или упростить свои вложения / подписки, это было бы здорово.
Комментарии:
1. В Rx источник
Observable
не должен зависеть от каких-либоSubscribe
действий.
Ответ №1:
Если вы хотите смешивать блоки потока данных TPL, вы можете делать то, что хотите. Хотя я должен сказать, что это немного не по-русски.
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
Thread.Sleep(2500);
Console.WriteLine(l);
});
source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());
BroadcastBlock — это очередь, в которой будет храниться только последнее значение. Блоки потока данных TPL действительно имеют хорошую поддержку с помощью Rx. Таким образом, мы можем подписаться на событие и поместить его в BroadcastBlock, а затем подписаться на BroadcastBlock.