#.net-core #filter #autofac #masstransit
#.net-ядро #Фильтр #автофас #masstransit
Вопрос:
У меня есть общий интерфейс сообщений, подобный этому:
public interface IMyMessage
{
int EventCode {get;}
}
Теперь у меня есть несколько потребителей, обрабатывающих это сообщение:
public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}
Я хочу, чтобы MyConsumer1 обрабатывал только те сообщения, где EventCode==1, и чтобы MyConsumer2 обрабатывал все сообщения, где EventCode==2.
Я знаю, что могу выполнить оператор if в методе Consume, но хотите знать, есть ли лучший способ, например, какой-нибудь фильтр маршрутизации?
Моим предпочтительным способом было бы создать атрибут ie. Обрабатывает атрибут eventcodeattribute(1) и применяет его к потребителям.
Я также использую интеграцию контейнера Autofac с MassTransit.
Пожалуйста, помогите.
Спасибо
Ответ №1:
Прежде чем я дам какой-либо ответ на фактический вопрос, я хотел бы спросить, почему вы используете один и тот же тип сообщения со свойством, чтобы определить, какие потребители на самом деле потребляют сообщение. Доступны лучшие (более эффективные) методы, такие как использование ПРЯМОГО обмена с RabbitMQ.
Вы могли бы создать свой собственный атрибут и создать фильтр промежуточного программного обеспечения, который просматривал бы потребителя, проверял, есть ли у него пользовательский атрибут, а затем использовал значение этого атрибута для проверки сообщения и фильтрации его, если потребитель в нем не заинтересован.
Полный рабочий образец показан ниже:
Сначала создайте атрибут.
class EventCodeAttribute :
Attribute
{
public int EventCode { get; }
public EventCodeAttribute(int eventCode)
{
EventCode = eventCode;
}
}
И тип сообщения:
interface IEventMessage
{
int EventCode { get; }
}
Фильтр промежуточного программного обеспечения:
class EventCodeFilter<TConsumer> :
IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
where TConsumer : class
{
readonly int _eventCode;
public EventCodeFilter()
{
var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
if (attribute == null)
throw new ArgumentException("Message does not have the attribute required");
_eventCode = attribute.EventCode;
}
public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
{
if (context.Message.EventCode.Equals(_eventCode))
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("eventCode");
scope.Add("code", _eventCode);
}
}
Образец потребителя:
[EventCode(27)]
class EventCodeConsumer :
IConsumer<IEventMessage>
{
public async Task Consume(ConsumeContext<IEventMessage> context)
{
}
}
Наконец, настройте потребителя на использование фильтра:
builder.AddMassTransit(cfg =>
{
cfg.AddConsumer<EventCodeConsumer>(x =>
x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
});
Комментарии:
1. Спасибо за ответ. Это именно то, что мне нужно. Теперь, что касается Вашего вопроса об одном сообщении… Я не контролирую издателя, издатель считывает данные с контроллера ПЛК, используя объединение, а затем для каждого считывания данных он публикует сообщение с кодом события, которое он прочитал с ПЛК.
2. Ах, да, полностью понимаю.