#c# #system.reactive
#c# #system.reactive
Вопрос:
Мне нужно соединить мою платформу обмена сообщениями, чтобы при возникновении события (TimeAggregate, которое содержит элементы с именами Reading
и Stream
) я мог переходить в реактивные расширения и делать необычные вещи:
public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
public TestHandler(/* singleton variables are injected here */)
{
}
public async Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
{
// notification contains TimeAggregate.Reading (which is a decimal)
// and TimeAggregate.Stream (which is a string, i.e. Office1, OfficeA etc)
// I want to perform an average on TimeAggregate.Reading but split by TimeAggregate.Reading
}
}
Как бы я связал эти два (обработчик событий и реактивное расширение) и разделил, чтобы каждое имя чтения усреднялось независимо (своего рода словарь средних наблюдаемых значений) за единицу времени.Ключ потока?
Подробности о сроке службы
Также существует технический аспект, когда каждый раз, когда вызывается обработчик событий ( TestHandler
), он создается с нуля, время жизни TestHandler является временным.
Я могу обойти это, создав менеджер статической регистрации (который IoC вводит в конструктор TestHandler, или мы буквально ссылаемся на него как на статический внутри).
Комментарии:
1. Похоже, вы хотите внедрить интерфейс с одноэлементным временем жизни, использовать
Subject
для сбора данных, а затем использоватьIObservable.Scan
для агрегирования переменных, необходимых для текущего среднего значения (total и count) для каждого ключа. Затем вы можете использоватьSelect
для уменьшения до среднего. Обратите внимание, ваш вопрос неясен, потому что вы описываетеTimeAggregate.Reading
как строку, так и десятичную дробь.
Ответ №1:
Один из подходов заключается в внедрении службы с одноэлементным временем жизни, которое может быть вызвано в обработчике событий. Он может использовать Subject
для генерации наблюдаемого и предоставления / использования наблюдаемого для выполнения средних сообщений:
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key, average)
public IObservable<(string, decimal)> Averages => _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements 1, agg.total v))
// Calculate running average
.Select(t => t.total / t.elements)
// Associate key and average for SelectMany
.Select(average => (g.Key, average)));
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
Который затем можно использовать следующим образом:
var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);
var dict = new Dictionary<string, decimal>();
monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
{
// Do something with running average. In this case populate a dictionary
dict[t.group] = t.average;
});
Примечание: если вы хотите начать вычислять средние значения до того, как наблюдатель будет подключен, вы можете заглянуть в подключаемые устройства ( Publish
и т.д.), И вы хотели бы избежать использования такого свойства, генерирующего только получение, и вместо этого назначить его один раз в конструкторе. Но помните, что этот подход будет отправлять обновления только для средних значений, поэтому, если сообщение не является болтливым, среднее сообщение будет отложено.
например
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key, average)
public IObservable<(string, decimal)> Averages { get; }
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
Averages = _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements 1, agg.total v))
// Calculate running average
.Select(t => t.total / t.elements)
// Associate key and average for SelectMany
.Select(average => (g.Key, average)))
.Publish()
// Connect immediately
.AutoConnect(0);
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
Полный пример:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
public static async Task Main()
{
var monitoringService = new MonitoringService();
var handler = new TestHandler(monitoringService);
var dict = new Dictionary<string, decimal>();
monitoringService.Averages.Subscribe<(string group, decimal average)>(t =>
{
// Do something with running average. In this case populate a dictionary
dict[t.group] = t.average;
});
await handler.Handle(new TimeAggregate
{
Group = "Test1",
Reading = 100
}, "Test", CancellationToken.None);
await handler.Handle(new TimeAggregate
{
Group = "Test1",
Reading = 200
}, "Test", CancellationToken.None);
await handler.Handle(new TimeAggregate
{
Group = "Test2",
Reading = 200
}, "Test", CancellationToken.None);
await handler.Handle(new TimeAggregate
{
Group = "Test2",
Reading = 300
}, "Test", CancellationToken.None);
}
}
public class MonitoringService : IMonitoringService
{
private Subject<TimeAggregate> _subject;
// Calculate tuples of (key, average)
public IObservable<(string, decimal)> Averages => _subject
// Group by key
.GroupBy(s => s.Group)
.SelectMany(g => g
.Select(g => g.Reading)
// Collect element count and running total
.Scan((elements: 0, total: (decimal)0), (agg, v) => (agg.elements 1, agg.total v))
// Calculate running average
.Select(t => t.total / t.elements)
// Associate key and average for SelectMany
.Select(average => (g.Key, average)));
public MonitoringService()
{
_subject = new Subject<TimeAggregate>();
}
public void PostNotification(TimeAggregate notification)
{
_subject.OnNext(notification);
}
}
public class TestHandler : ITopicNotificationHandler<TimeAggregate>
{
private readonly IMonitoringService _monitoringService;
public TestHandler(IMonitoringService monitoringService)
{
_monitoringService = monitoringService;
}
public Task Handle(TimeAggregate notification, string topic, CancellationToken cancellationToken)
{
_monitoringService.PostNotification(notification);
return Task.CompletedTask;
}
}
public interface IMonitoringService
{
void PostNotification(TimeAggregate notification);
IObservable<(string group, decimal average)> Averages { get; }
}
public class TimeAggregate
{
public string Group { get; set; }
public decimal Reading { get; set; }
}
public interface ITopicNotificationHandler<T>
{
}
}
Комментарии:
1. Большое вам спасибо, Джейсон, это фантастика, может быть, есть какая-то работа, если интересно, можете ли вы указать контактные данные в профиле, чтобы я мог связаться?