Производитель / потребитель в обработчике команд NServiceBus

#c# #.net #nservicebus #producer-consumer

#c# #.net #nservicebus #производитель-потребитель

Вопрос:

Поскольку NServiceBus, похоже, не поддерживает добавление механизма приоритета в очередь сообщений, я хочу реализовать это сам.

  • Обработчик команд (производитель):
 public void Handle(DoAnAction message)
{
  _messageManager.Insert(message);
}
  
  • Один потребитель (другой поток):
 public void Run()
{
  DoAnAction message;
  while (true) 
  {
    if (_messageManager.TryDequeue(out message)) 
    {
      doALongCall(message);
    }
    else 
    {
      Thread.sleep(200);
    }
  }
}
  

Это вообще хорошая идея для начала? Мне не нравится идея, что я могу потерять сообщения таким образом.

Обновление: вариант использования: у нас есть много клиентов, которые могут отправлять сообщение DoAnAction. Обработка этого действия занимает некоторое время. Проблема в том, что когда 1 клиент решает отправить 200 DoAnActions, всем остальным клиентам приходится ждать 2-3 часа, чтобы все эти сообщения могли быть обработаны (FIFO). Вместо этого я хотел бы обрабатывать эти сообщения в порядке, основанном на клиенте.

Таким образом, даже если у клиента A все еще есть 200 сообщений для обработки, когда клиент B отправляет сообщение, оно будет помещено в очередь следующим. Если бы клиент B отправил 3 сообщения, очередь выглядела бы так: B-A, B-A, B-A, A, A, A, …

Комментарии:

1. Вы можете использовать упорядочение обработчиков .

2. @DmytroMukalov Здесь важен не порядок обработки обработчиков, а порядок, в котором (однотипные) сообщения обрабатываются в первую очередь. Я отредактировал вопрос.

3. В этом случае вам определенно понадобится некоторое промежуточное хранилище (с произвольным доступом) для сообщений, чтобы определить вашу собственную стратегию справедливости.

4. @DmytroMukalov вот как я это реализую прямо сейчас. В этом промежуточном хранилище нет постоянного хранилища (списка и активной очереди), однако это все равно может быть добавлено на более позднем этапе.

Ответ №1:

Часто я обнаруживал, что реальная причина для этого исходит из бизнес-точки зрения. В этом случае имеет смысл смоделировать его в коде, чтобы отразить правила и положения, касающиеся бизнеса.

Представьте, что у вас есть SendEmail сообщение, но вы хотите «расставить приоритеты» для некоторых сообщений в зависимости от клиента, для которого оно предназначено. Вы можете спроектировать свою систему по-другому, чтобы у вас было два типа сообщений: одно обычное SendEmail и одно вызываемое SendPriorityEmail , которое отправляется в другую конечную точку / очередь. Вам нужно будет определить в своем коде, какое сообщение отправлять.

Разделение сообщений в корне означает, что у вас больше гибкости (которая также исходит от бизнеса), что полезно при выполнении мониторинга, соглашений об уровне обслуживания и качества обслуживания, когда речь идет о более важных клиентах (в данном случае).

Комментарии:

1. Я согласен, что модель должна отражать все бизнес-правила. Я думаю, что наличие 2 очередей не будет отражать это. Не существует типа сообщения, которое получает приоритет. Приоритет определяется текущим отставанием сообщений, если вы понимаете. К каждому клиенту относятся одинаково, очень 2019 😉

2. Не уверен, что вы подразумеваете под приоритетом, определяемым отставанием сообщений, не могли бы вы уточнить?

3. Я внес обновление в свой вопрос. По сути, я хочу решить ситуацию, когда 200 сообщений от клиента A ожидают обработки на шине, а клиент B / C / D также начинает добавлять сообщения. Я не хочу сначала обрабатывать все сообщения от клиента A, я хочу обработать: «ABCD ABCD …»

4. Приоритетная очередь не поддерживается большинством базовых технологий, и из-за характера «очереди» большей части транспорта это невозможно. Возможно, вы захотите ознакомиться с этой проблемой, которая касается той же проблемы (хотя вокруг транспорта RabbitMQ, который имеет некоторый уровень поддержки)

Ответ №2:

Вы можете использовать Sagas для выполнения того, что вы ищете, по сути, имея один экземпляр saga на идентификатор клиента. Сага служит «контрольной точкой» и может гарантировать, что для каждого клиента одновременно обрабатывается только N сообщений.

Это может снизить пропускную способность ниже максимальной при более низких уровнях нагрузки, но может привести к более «справедливому» распределению, которого вы пытаетесь достичь.

Имеет ли это смысл?

Комментарии:

1. Я понимаю саги. Однако я не понимаю, как это было бы хорошо, не могли бы вы уточнить? Я обновил вопрос дополнительной информацией.

2. Я был бы рад позвонить и обсудить, напишите мне по адресу sean.farmar@particular.net

3. Добавлено больше подробностей об использовании sagas — надеюсь, это поможет.

4. Спасибо @UdiDahan, это действительно быстрая победа, но упомянутый недостаток при низком уровне нагрузки в данном случае был неприемлемым.

5. @andy Поскольку это «быстрая победа», я предлагаю вам сделать быстрый скачок этого подхода, прежде чем отклонять его по соображениям производительности. Я не думаю, что вы окажете какое-либо существенное влияние на пользователя при низкой нагрузке.