Обработчик событий для реактивных расширений, где обработчик событий является временным образом жизни

#c# #system.reactive

#c# #system.reactive

Вопрос:

Мне нужно соединить мою платформу обмена сообщениями, чтобы при возникновении события (TimeAggregate, которое содержит элементы с именами Reading и Stream ) я мог переходить в реактивные расширения и делать необычные вещи:

 public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
    public TestHandler(/* singleton variables are injected here */)
    {
    }

    public async Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
    {
        // notification contains TimeAggregate.Reading (which is a decimal)
        // and TimeAggregate.Stream (which is a string, i.e. Office1, OfficeA etc)
        // I want to perform an average on TimeAggregate.Reading but split by TimeAggregate.Reading
    }
}
  

Как бы я связал эти два (обработчик событий и реактивное расширение) и разделил, чтобы каждое имя чтения усреднялось независимо (своего рода словарь средних наблюдаемых значений) за единицу времени.Ключ потока?

Подробности о сроке службы

Также существует технический аспект, когда каждый раз, когда вызывается обработчик событий ( TestHandler ), он создается с нуля, время жизни TestHandler является временным.

Я могу обойти это, создав менеджер статической регистрации (который IoC вводит в конструктор TestHandler, или мы буквально ссылаемся на него как на статический внутри).

Комментарии:

1. Похоже, вы хотите внедрить интерфейс с одноэлементным временем жизни, использовать Subject для сбора данных, а затем использовать IObservable.Scan для агрегирования переменных, необходимых для текущего среднего значения (total и count) для каждого ключа. Затем вы можете использовать Select для уменьшения до среднего. Обратите внимание, ваш вопрос неясен, потому что вы описываете TimeAggregate.Reading как строку, так и десятичную дробь.

Ответ №1:

Один из подходов заключается в внедрении службы с одноэлементным временем жизни, которое может быть вызвано в обработчике событий. Он может использовать Subject для генерации наблюдаемого и предоставления / использования наблюдаемого для выполнения средних сообщений:

 public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key, average)
    public IObservable<(string, decimal)> Averages => _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements   1, agg.total   v))
            // Calculate running average
            .Select(t => t.total / t.elements)
            // Associate key and average for SelectMany
            .Select(average => (g.Key, average)));

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}
  

Который затем можно использовать следующим образом:

 var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);

var dict = new Dictionary<string, decimal>();
monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
{
    // Do something with running average. In this case populate a dictionary
    dict[t.group] = t.average;
});
  

словарь средних значений

Примечание: если вы хотите начать вычислять средние значения до того, как наблюдатель будет подключен, вы можете заглянуть в подключаемые устройства ( Publish и т.д.), И вы хотели бы избежать использования такого свойства, генерирующего только получение, и вместо этого назначить его один раз в конструкторе. Но помните, что этот подход будет отправлять обновления только для средних значений, поэтому, если сообщение не является болтливым, среднее сообщение будет отложено.
например

 public class MonitoringService : IMonitoringService
{
    private Subject<TimeAggregate> _subject;

    // Calculate tuples of (key, average)
    public IObservable<(string, decimal)> Averages { get; }

    public MonitoringService()
    {
        _subject = new Subject<TimeAggregate>();
        Averages = _subject
        // Group by key
        .GroupBy(s => s.Group)
        .SelectMany(g => g
            .Select(g => g.Reading)
            // Collect element count and running total
            .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements   1, agg.total   v))
            // Calculate running average
            .Select(t => t.total / t.elements)
            // Associate key and average for SelectMany
            .Select(average => (g.Key, average)))
        .Publish()
        // Connect immediately
        .AutoConnect(0);
    }
    public void PostNotification(TimeAggregate notification)
    {
        _subject.OnNext(notification);
    }
}
  

Полный пример:

 using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static async Task Main()
        {
            var monitoringService = new MonitoringService();
            var handler = new TestHandler(monitoringService);

            var dict = new Dictionary<string, decimal>();
            monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
            {
                // Do something with running average. In this case populate a dictionary
                dict[t.group] = t.average;
            });

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",
                Reading = 100
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test1",
                Reading = 200
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test2",
                Reading = 200
            }, "Test", CancellationToken.None);

            await handler.Handle(new TimeAggregate
            {
                Group = "Test2",
                Reading = 300
            }, "Test", CancellationToken.None);
        }  
    }

    public class MonitoringService : IMonitoringService
    {
        private Subject<TimeAggregate> _subject;

        // Calculate tuples of (key, average)
        public IObservable<(string, decimal)> Averages => _subject
            // Group by key
            .GroupBy(s => s.Group)
            .SelectMany(g => g
                .Select(g => g.Reading)
                // Collect element count and running total
                .Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements   1, agg.total   v))
                // Calculate running average
                .Select(t => t.total / t.elements)
                // Associate key and average for SelectMany
                .Select(average => (g.Key, average)));

        public MonitoringService()
        {
            _subject = new Subject<TimeAggregate>();
        }
        public void PostNotification(TimeAggregate notification)
        {
            _subject.OnNext(notification);
        }
    }

    public class TestHandler : ITopicNotificationHandler<TimeAggregate>
    {
        private readonly IMonitoringService _monitoringService;

        public TestHandler(IMonitoringService monitoringService)
        {
            _monitoringService = monitoringService;
        }

        public Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
        {
            _monitoringService.PostNotification(notification);
            return Task.CompletedTask;
        }
    }

    public interface IMonitoringService
    {
        void PostNotification(TimeAggregate notification);
        IObservable<(string group, decimal average)> Averages { get; }
    }

    public class TimeAggregate
    {
        public string Group { get; set; }
        public decimal Reading { get; set; }
    }

    public interface ITopicNotificationHandler<T>
    {
    }
}
  

Комментарии:

1. Большое вам спасибо, Джейсон, это фантастика, может быть, есть какая-то работа, если интересно, можете ли вы указать контактные данные в профиле, чтобы я мог связаться?