доступ к IObservable внутри той же IObservable подписки

#c# #iterator #system.reactive

#c# #итератор #system.reactive

Вопрос:

Вот простой пример того, что я пытаюсь сделать с реактивными расширениями, но это не работает

Добавить не работает в этом простом примере

     public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum   i);
        return sum;
    }
  

Фактический результат

 1
_
  

Желаемый результат

 1
6
2
6
3
6
Sequence Completed
_
  

т.е. я хотел бы выполнить метод Add (obs) внутри каждой итерации, где obs сам является холодным IObservable, проходящим итерацию?

Комментарии:

1. Скорее всего, вы делаете это неправильно ™ здесь. Можете ли вы описать свой сценарий и то, что вы пытаетесь выполнить на более высоком уровне?

2. @Paul пожалуйста, посмотрите комментарий, который я только что добавил к Enigmativity..

Ответ №1:

Измените это:

 IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)
  

для этого:

 IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)
  

Вы должны отметить, что вы поступаете плохо в том, что касается Rx. Вы перемещаетесь в наблюдаемые объекты и из них. Вы должны избегать этого, где это возможно.

Так, например, избегайте этого:

     var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();
  

когда это то же самое:

     var obs = Observable.Range(1, 3);
  

Кроме того, весь static int Add(IObservable<int> wholeList) метод плох. Он вызывает ForEach (что обычно должно быть предупреждением о том, что вы делаете что-то неправильно), чтобы извлечь значения из наблюдаемого. Именно здесь может произойти мертвая блокировка.

Уже существует вызываемое расширение observable Sum , которое возвращает an IObservble<int> , и это не выводит вас из наблюдаемого.

Итак, попробуйте написать свой код следующим образом:

 var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}
  

Надеюсь, это поможет.

Комментарии:

1. Это прекрасный ответ, и я принимаю ваши очки! К сожалению, мне нужно использовать. subscribeOn(планировщик. newThread) по причинам, не проиллюстрированным здесь, есть ли другой способ выполнить myMethod(obs) в конце каждой итерации?

2. @Cel — я изменил свой пример кода на использование SubscribeOn , а не ObserveOn , и он все еще работает. Это пример того, где пребывание в наблюдаемых работает при выходе за их пределы, поэтому вам следует избегать вызова вашего MyMethod(obs) .

3. я думаю, что я должен был опубликовать предполагаемый myMethod (obs) в первую очередь, потому что Add (obs) на самом деле плохо моделирует сценарий. Желаемый myMethod на самом деле является SaveMessages (obs), поэтому я хотел бы не суммировать целые числа, а в конце каждой отдельной итерации сообщения я хотел бы сохранить ВСЕ сообщения, содержащиеся в obs. и я не хочу сохранять все элементы в obs только при OnCompleted, мне нужно делать это в конце каждой итерации?

4. @Cel — Затем измените подпись на static IObservable<int> Add(IObservable<int> wholeList) , а затем используйте стандартные операторы или Observable.Create<T>(...) внутри метода.

Ответ №2:

В качестве вашего комментария я бы посоветовал вам сделать observable для генерации элементов по мере необходимости и не делать этого после подписки. В вашем примере вы можете сделать что-то вроде:

 var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});