#c# #iterator #system.reactive
#c# #итератор #system.reactive
Вопрос:
Вот простой пример того, что я пытаюсь сделать с реактивными расширениями, но это не работает
Добавить не работает в этом простом примере
public static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Console.WriteLine(Add(obs).ToString());
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
Console.ReadLine();
subscription.Dispose();
}
private static int Add(IObservable<int> wholeList)
{
int sum = 0;
wholeList.ForEach(i => sum = sum i);
return sum;
}
Фактический результат
1
_
Желаемый результат
1
6
2
6
3
6
Sequence Completed
_
т.е. я хотел бы выполнить метод Add (obs) внутри каждой итерации, где obs сам является холодным IObservable, проходящим итерацию?
Комментарии:
1. Скорее всего, вы делаете это неправильно ™ здесь. Можете ли вы описать свой сценарий и то, что вы пытаетесь выполнить на более высоком уровне?
2. @Paul пожалуйста, посмотрите комментарий, который я только что добавил к Enigmativity..
Ответ №1:
Измените это:
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)
для этого:
IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)
Вы должны отметить, что вы поступаете плохо в том, что касается Rx. Вы перемещаетесь в наблюдаемые объекты и из них. Вы должны избегать этого, где это возможно.
Так, например, избегайте этого:
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
когда это то же самое:
var obs = Observable.Range(1, 3);
Кроме того, весь static int Add(IObservable<int> wholeList)
метод плох. Он вызывает ForEach
(что обычно должно быть предупреждением о том, что вы делаете что-то неправильно), чтобы извлечь значения из наблюдаемого. Именно здесь может произойти мертвая блокировка.
Уже существует вызываемое расширение observable Sum
, которое возвращает an IObservble<int>
, и это не выводит вас из наблюдаемого.
Итак, попробуйте написать свой код следующим образом:
var obs = Observable.Range(1, 3);
var query =
from n in obs
from s in obs.Sum()
select new
{
Number = n.ToString(),
Sum = s.ToString(),
};
using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
x =>
{
Console.WriteLine(x.Number);
Console.WriteLine(x.Sum);
},
err =>
Console.WriteLine("Error"),
() =>
Console.WriteLine("Sequence Completed")))
{
Console.ReadLine();
}
Надеюсь, это поможет.
Комментарии:
1. Это прекрасный ответ, и я принимаю ваши очки! К сожалению, мне нужно использовать. subscribeOn(планировщик. newThread) по причинам, не проиллюстрированным здесь, есть ли другой способ выполнить myMethod(obs) в конце каждой итерации?
2. @Cel — я изменил свой пример кода на использование
SubscribeOn
, а неObserveOn
, и он все еще работает. Это пример того, где пребывание в наблюдаемых работает при выходе за их пределы, поэтому вам следует избегать вызова вашегоMyMethod(obs)
.3. я думаю, что я должен был опубликовать предполагаемый myMethod (obs) в первую очередь, потому что Add (obs) на самом деле плохо моделирует сценарий. Желаемый myMethod на самом деле является SaveMessages (obs), поэтому я хотел бы не суммировать целые числа, а в конце каждой отдельной итерации сообщения я хотел бы сохранить ВСЕ сообщения, содержащиеся в obs. и я не хочу сохранять все элементы в obs только при OnCompleted, мне нужно делать это в конце каждой итерации?
4. @Cel — Затем измените подпись на
static IObservable<int> Add(IObservable<int> wholeList)
, а затем используйте стандартные операторы илиObservable.Create<T>(...)
внутри метода.
Ответ №2:
В качестве вашего комментария я бы посоветовал вам сделать observable для генерации элементов по мере необходимости и не делать этого после подписки. В вашем примере вы можете сделать что-то вроде:
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));
obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
Console.WriteLine(t.Item1);
SaveItems(t.Item2);
});