Rx — последняя очередь объединения

#c# #.net #system.reactive

#c# #.net #система.реактивная

Вопрос:

Я пытаюсь добиться функциональности, подобной очереди, с помощью Rx (я знаю, что могу выполнить это с помощью Queue Locking, но пытаюсь учиться использовать Rx, поскольку у меня не так много возможностей его использовать).

Функциональность заключается в том, что я хочу предпринять некоторые действия для выполнения события, но хочу обрабатывать только по одному за раз, а после завершения обрабатывать следующий / последний (если он новый).

Это то, что у меня есть на данный момент (используя наблюдаемый флаг увеличения, чтобы я мог это сделать DistinctUntilChanged , но это похоже на взлом).

         // Some source
        var events = new Subject<string>();
        events.Subscribe(s => Console.WriteLine("Event: "   s));

        // How can I get rid of this
        var counter = 0;

        var flag = new Subject<int>();
        flag.Subscribe(i => Console.WriteLine("Flag: "   i));
        var combined = events
            .CombineLatest(flag, (s, i) => new {Event = s, Flag = i});

        var worker = combined
            .DistinctUntilChanged(arg => arg.Flag)
            .DistinctUntilChanged(arg => arg.Event);

        worker.Subscribe(x => Console.WriteLine("rnDo Work: "   x.Event   "rn"));


        flag.OnNext(counter  ); // Ready
        events.OnNext("one"); // Idle, Start eight

        events.OnNext("two");
        events.OnNext("three");
        events.OnNext("four");

        events.OnNext("five"); // Process
        flag.OnNext(counter  ); // Finished one, start five

        events.OnNext("six");
        events.OnNext("seven");

        events.OnNext("eight"); // Idle, Start eight
        flag.OnNext(counter  ); // Finished five, start eight
        flag.OnNext(counter  ); // Finished eight
        
        events.OnNext("nine"); // Should be processed
  

Если вы запустите этот код, вы увидите, что последнее событие не обрабатывается, даже если субъект бездействует.

Такое ощущение, что я здесь чего-то не хватает….

На данный момент я смотрю на использование наблюдаемых наблюдаемых каким-то образом …. но пытался разобраться в этом последние пару часов или около того 🙁

Редактировать

Удалось сделать это, изменив subject на ReplaySubject

         var events = new ReplaySubject<string>(1);
  

и вводим еще одну переменную, но это похоже на еще один взлом, но это позволило мне избавиться от счетчика.

         string lastSeen = null;
        flag.Select(x => events.Where(s => s != lastSeen).Take(1))
            .Subscribe(x =>
                x.Subscribe(s =>
                {
                    lastSeen = s;
                    Console.WriteLine(s);
                })
            );
  

Если кто-то знает лучший способ, которым я могу избавиться string lastSeen или упростить свои вложения / подписки, это было бы здорово.

Комментарии:

1. В Rx источник Observable не должен зависеть от каких-либо Subscribe действий.

Ответ №1:

Если вы хотите смешивать блоки потока данных TPL, вы можете делать то, что хотите. Хотя я должен сказать, что это немного не по-русски.

 var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
    Thread.Sleep(2500);
    Console.WriteLine(l);
});

source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());
  

BroadcastBlock — это очередь, в которой будет храниться только последнее значение. Блоки потока данных TPL действительно имеют хорошую поддержку с помощью Rx. Таким образом, мы можем подписаться на событие и поместить его в BroadcastBlock, а затем подписаться на BroadcastBlock.