Массовая фильтрация сообщений, которые может обрабатывать потребитель

#.net-core #filter #autofac #masstransit

#.net-ядро #Фильтр #автофас #masstransit

Вопрос:

У меня есть общий интерфейс сообщений, подобный этому:

 public interface IMyMessage
{
     int EventCode {get;}
}
 

Теперь у меня есть несколько потребителей, обрабатывающих это сообщение:

 public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}
 

Я хочу, чтобы MyConsumer1 обрабатывал только те сообщения, где EventCode==1, и чтобы MyConsumer2 обрабатывал все сообщения, где EventCode==2.

Я знаю, что могу выполнить оператор if в методе Consume, но хотите знать, есть ли лучший способ, например, какой-нибудь фильтр маршрутизации?

Моим предпочтительным способом было бы создать атрибут ie. Обрабатывает атрибут eventcodeattribute(1) и применяет его к потребителям.

Я также использую интеграцию контейнера Autofac с MassTransit.

Пожалуйста, помогите.

Спасибо

Ответ №1:

Прежде чем я дам какой-либо ответ на фактический вопрос, я хотел бы спросить, почему вы используете один и тот же тип сообщения со свойством, чтобы определить, какие потребители на самом деле потребляют сообщение. Доступны лучшие (более эффективные) методы, такие как использование ПРЯМОГО обмена с RabbitMQ.

Вы могли бы создать свой собственный атрибут и создать фильтр промежуточного программного обеспечения, который просматривал бы потребителя, проверял, есть ли у него пользовательский атрибут, а затем использовал значение этого атрибута для проверки сообщения и фильтрации его, если потребитель в нем не заинтересован.

Полный рабочий образец показан ниже:

Сначала создайте атрибут.

 class EventCodeAttribute :
    Attribute
{
    public int EventCode { get; }

    public EventCodeAttribute(int eventCode)
    {
        EventCode = eventCode;
    }
}
 

И тип сообщения:

 interface IEventMessage
{
    int EventCode { get; }
}
 

Фильтр промежуточного программного обеспечения:

 class EventCodeFilter<TConsumer> :
    IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
    where TConsumer : class
{
    readonly int _eventCode;

    public EventCodeFilter()
    {
        var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
        if (attribute == null)
            throw new ArgumentException("Message does not have the attribute required");

        _eventCode = attribute.EventCode;
    }

    public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
    {
        if (context.Message.EventCode.Equals(_eventCode))
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("eventCode");
        scope.Add("code", _eventCode);
    }
}
 

Образец потребителя:

 [EventCode(27)]
class EventCodeConsumer :
    IConsumer<IEventMessage>
{
    public async Task Consume(ConsumeContext<IEventMessage> context)
    {
    }
}
 

Наконец, настройте потребителя на использование фильтра:

 builder.AddMassTransit(cfg =>
{
    cfg.AddConsumer<EventCodeConsumer>(x =>
        x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
});
 

Комментарии:

1. Спасибо за ответ. Это именно то, что мне нужно. Теперь, что касается Вашего вопроса об одном сообщении… Я не контролирую издателя, издатель считывает данные с контроллера ПЛК, используя объединение, а затем для каждого считывания данных он публикует сообщение с кодом события, которое он прочитал с ПЛК.

2. Ах, да, полностью понимаю.