#c# #.net #system.reactive
#c# #.net #system.реактивный
Вопрос:
Я пытаюсь получить 20 последних значений наблюдаемого и представить его как свойство без возникновения блокировки. На данный момент мой код выглядит так:
class Foo
{
private IObservable<int> observable;
public Foo(IObservable<int> bar)
{
this.observable = bar;
}
public IEnumerable<int> MostRecentBars
{
get
{
return this.observable.TakeLast(20).ToEnumerable();
}
}
}
Однако, когда вызывается средство получения MostRecentBars, это блокируется, предположительно потому, что ToEnumerable не вернется, пока не будет по крайней мере 20 наблюдаемых значений.
Есть ли встроенный способ предоставить максимум 20 самых последних значений наблюдаемого без блокировки? Если наблюдаемых значений меньше 20, он должен просто вернуть их все.
Комментарии:
1. В IObservable<T> нет так называемого метода takeLast
Ответ №1:
Я дам вам два варианта. Используется оператор Rx, Scan
но я думаю, что это немного усложняет чтение. Другой использует стандарт Queue
с блокировкой. Вы можете выбрать.
(1)
class Foo
{
private int[] bars = new int[] { };
public Foo(IObservable<int> bar)
{
bar
.Scan<int, int[]>(
new int[] { },
(ns, n) =>
ns
.Concat(new [] { n, })
.TakeLast(20)
.ToArray())
.Subscribe(ns => bars = ns);
}
public IEnumerable<int> MostRecentBars
{
get
{
return bars;
}
}
}
(2)
class Foo
{
private Queue<int> queue = new Queue<int>();
public Foo(IObservable<int> bar)
{
bar.Subscribe(n =>
{
lock (queue)
{
queue.Enqueue(n);
if (queue.Count > 20)
{
queue.Dequeue();
}
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (queue)
{
return queue.ToArray();
}
}
}
}
Я надеюсь, что это поможет.
Комментарии:
1. Теперь, увидев, что Илиан Пинзон в значительной степени написал ту же
Queue
реализацию, я бы выбрал этот подход в качестве наилучшего варианта. Удивительно, насколько близки наши решения по этому вопросу. Я не смотрел на него до тех пор, пока не опубликовал свой. Тем не менее,Scan
оператор часто упускается из виду, но он очень полезен во многих ситуациях.2. На самом деле я предпочитаю использовать первый подход
Scan
, он позволяет продолжить наблюдаемую цепочку за пределами «взять последнее (до) n». На самом деле, было бы здорово превратить это в метод расширения для этой цели, т. Е.var recentAverageScore = scores.TakeUpToLast(25).Average();
Или Что-то еще, что вы хотите сделать сIObservable<T>
результатом.
Ответ №2:
Я не могу придумать встроенных операторов Rx, которые соответствуют вашим требованиям. Вы могли бы реализовать это таким образом:
class Foo
{
private IObservable<int> observable;
private Queue<int> buffer = new Queue<int>();
public Foo(IObservable<int> bar)
{
this.observable = bar;
this.observable
.Subscribe(item =>
{
lock (buffer)
{
if (buffer.Count == 20) buffer.Dequeue();
buffer.Enqueue(item);
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (buffer)
{
return buffer.ToList(); // Create a copy.
}
}
}
}
Ответ №3:
Хотя вы уже получили свой ответ, я думал о решении этой проблемы с помощью Replay Subject с буфером и придумал что-то вроде:
class Foo
{
private ReplaySubject<int> replay = new ReplaySubject<int>(20);
public Foo(IObservable<int> bar)
{
bar.Subscribe(replay);
}
public IEnumerable<int> MostRecentBars
{
get
{
var result = new List<int>();
replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread
return resu<
}
}
}
Дайте мне знать, если это соответствует вашей проблеме.
Ответ №4:
У меня есть несколько расширений, которые я обычно подключаю к любому проекту, который я создаю с помощью реактивных расширений, одним из них является скользящее окно:
public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length)
{
Queue<T> window = new Queue<T>();
return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) =>
{
window.Enqueue(b);
if (window.Count > length)
window.Dequeue();
return window.ToArray();
});
}
Это возвращает массив из самых последних N элементов (или меньше, если еще не было N элементов).
В вашем случае вы должны быть в состоянии сделать:
class Foo
{
private IObservable<int> observable;
private int[] latestWindow = new int[0];
IDisposable slidingWindowSubscription;
public Foo(IObservable<int> bar)
{
this.observable = bar;
slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a =>
{
latestWindow = a;
});
}
public IEnumerable<int> MostRecentBars
{
get
{
return latestWindow;
}
}
}
Ответ №5:
Серьезно, разве все это не зависит от того, как долго вы готовы ждать? Откуда вы знаете, что наблюдаемое завершено, когда у вас есть 19 элементов? Через секунду вы уверены, что это сделано? десять лет спустя? Вы уверены, что это сделано? Откуда ты знаешь? Это наблюдаемо, поэтому вы должны продолжать наблюдать за ним до тех пор, пока ваши операторы или любые другие монадические преобразования, которые вы применяете, не выполнят какое-либо полезное преобразование с входящим потоком.
Я бы подумал, что (возможно, новое дополнение) окно или буфер с перегрузкой по времени будут работать. Особенно Window, он освобождает Observable от Observable, поэтому после создания внешнего Observable вы можете фактически прослушать первый из 20 элементов, а затем вам нужно внимательно прослушать OnCompleted, иначе вы потеряете весь смысл оператора Window, но вы поняли идею.