Почему мне НЕ нужно публиковать на этом холодном наблюдаемом?

#c# #system.reactive #reactive-programming #observable

#c# #system.reactive #реактивное программирование #observable

Вопрос:

Поскольку у меня здесь простуда Observable и я несколько раз подписываюсь на «сгруппированный», почему мне не нужна публикация здесь? Я бы ожидал, что при запуске он выдаст нежелательные результаты, но, к моему удивлению, он работает как с Publish, так и без Publish. Почему это?

 var subject = new List<string>
    {                            
    "test",                        
    "test",                 
    "hallo",
    "test",
    "hallo"
    }.ToObservable();
subject
    .GroupBy(x => x)
    .SelectMany(grouped => grouped.Scan(0, (count, _) =>   count)
         .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
    .Subscribe(result => Console.WriteLine("You typed {0} {1} times", 
         result.Chars, result.Count));

// I Would have expect that I need to use Publish like that
//subject
//   .GroupBy(x => x)
//   .SelectMany(grouped => grouped.Publish(sharedGroup => 
//       sharedGroup.Scan(0, (count, _) =>   count)
//       .Zip(sharedGroup, (count, chars) => 
//           new { Chars = chars, Count = count })))
//   .Subscribe(result => Console.WriteLine("You typed {0} {1} times", 
//       result.Chars, result.Count));

Console.ReadLine();
  

Редактировать

Как заметил Пол, поскольку мы дважды подписываемся на базовый холодный observable, нам следует дважды просмотреть последовательность. Однако мне не повезло сделать этот эффект видимым. Я пытался вставить строки отладки, но, например, это выводит «выполнение» только один раз.

 var subject = new List<Func<string>>
{                            
() =>
    {
        Console.WriteLine("performing");
        return "test";
    },                        
() => "test",                 
() => "hallo",
() => "test",
() => "hallo"
}.ToObservable();


subject
    .Select(x => x())
    .GroupBy(x => x)
    .SelectMany(grouped => grouped.Scan(0, (count, _) =>   count)
            .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
    .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
            result.Chars, result.Count));
  

Интересно, можем ли мы сделать видимым эффект, что мы имеем дело с холодной наблюдаемой и не используем Publish() . На другом шаге я хотел бы посмотреть, как Publish() (см. Выше) устраняется эффект.

РЕДАКТИРОВАТЬ 2

Как предложил Пол, я создал пользовательский IObservable<string> для целей отладки. Однако, если вы установите точку останова в этом Subscribe() методе, вы заметите, что она будет нажата один раз.

 class Program
{
    static void Main(string[] args)
    {
        var subject = new MyObservable();

        subject
            .GroupBy(x => x)
            .SelectMany(grouped => grouped.Scan(0, (count, _) =>   count)
                 .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
            .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
                 result.Chars, result.Count));

       Console.ReadLine();
   }
}

class MyObservable : IObservable<string>
{
    public IDisposable Subscribe(IObserver<string> observer)
    {
        observer.OnNext("test");
        observer.OnNext("test");
        observer.OnNext("hallo");
        observer.OnNext("test");
        observer.OnNext("hallo");
        return Disposable.Empty;
    }
}
  

Итак, для меня вопрос все еще открыт. Почему мне не нужна Publish здесь, на этом холодном Observable ?

Ответ №1:

Вы используете свой источник на основе списка только один раз, поэтому вы не увидите повторяющихся эффектов подписки там. Ключом к ответу на ваш вопрос является следующее наблюдение:

Объект IGroupedObservable<K, T>, вытекающий из GroupBy сам по себе, является скрытым объектом.

Внутренне GroupBy хранит словарь<K, ISubject<T>> . Всякий раз, когда приходит сообщение, оно отправляется в тему с соответствующим ключом. Вы дважды подписываетесь на объект группировки, что безопасно, поскольку субъект отделяет производителя от потребителя.

Комментарии:

1. Потрясающий Барт! Я думаю, что это близко к тому, что я хотел сказать в своем последнем комментарии к ответу Паулса. Спасибо, что разъяснили! Кстати, здорово, что ты наконец-то на SO. Надеюсь скоро увидеть вас и в Twitter 🙂 Люблю ваши видео. Зажигайте!

Ответ №2:

Повторное использование ‘grouped’ в Zip означает, что вы эффективно выполняете каждую группировку дважды — однако, поскольку ваш исходный код холодный, он все еще работает. Имеет ли это смысл?

Комментарии:

1. Можем ли мы каким-то образом сделать видимым эффект «выполнения группировки дважды»? Я попытался вставить Do (), чтобы посмотреть, группируется ли он несколько раз, но безуспешно. Кроме того, если это действительно так, почему сканирование не дублирует подсчеты? Вопрос в том, можем ли мы сделать видимым какой-либо эффект, который показывает, как это вычисление отличается на горячих или холодных наблюдаемых? И далее, можем ли мы показать, как можно избежать внутреннего дублирования (если оно существует) с помощью Publish?

2. Подумайте о том, что Zip-оператор собирается делать в своей реализации — он собирается подписаться на результат SelectMany (в.GroupBy.Scan), и он также собирается подписаться на ‘grouped’ (в.GroupBy). Есть 2 подписки на один и тот же ввод, так что в конечном итоге вы получите две копии ‘input’ (поскольку он холодный).

3. Можете ли вы отредактировать свой ответ таким образом, чтобы показать мне в коде, как сделать этот эффект видимым? Например, вставка Do (), запуск GUID или s.th. Я умоляю вас сделать это. Я действительно дергаю себя за волосы по этому поводу. Клянусь, я делаю этот ответ принятым и тогда прекращаю доставать вас по этому поводу 😉

4. Создайте пользовательский IObservable, который оборачивает ваш массив, затем установите точку останова в Subscribe 🙂

5. Сделал это. Это всего лишь один вызов Subscribe() (см. Обновленный вопрос). Может ли на самом деле быть так, что оператор GroupBy () заботится о том, чтобы сделать холодный наблюдаемый объект горячим, поэтому вы можете подписаться на результат оператора GroupBy несколько раз, даже если ваш источник на самом деле холодный? Имеет ли это смысл?