#message-queue #nservicebus
#блокировка #очередь сообщений #nservicebus
Вопрос:
У меня есть сценарий, в котором обработчик сообщений nservicebus мне нужен, чтобы предотвратить одновременное выполнение нескольких сообщений для одной и той же саги.
Обработчик для аргументов выполняет что-то вроде этого (способ упрощен в этом примере(
Сообщение:
public class MyMessage : IMessage {
public int OrderId {get;set;}
public int NewQuantityLevel {get;set;}
}
Сага:
public void Handle(MyMessage message)
{
// call remote service to get current order quantity
// do some logic and update remote service with difference between original and new quantity
Bus.Send(new MyOtherMessage())
}
Теперь я — это мой процесс, я могу получать 2 или более таких сообщений в любое время, и я не хочу, чтобы они извлекали количество заказа, которое, возможно, уже находится в процессе обновления или модификации где-то еще.
Я рассмотрел несколько решений:
- Получите мьютекс для заказа (в настоящее время у нас есть только один экземпляр worker, работающий на одной машине, но в будущем может быть несколько, и в этом случае мы могли бы использовать блокировку redis или что-то подобное)
- Используйте блокировку sql в службе, чтобы выполнить сериализованную блокировку строк / данных (однако не уверен, что это вообще сработает)
Ни один из них не кажется оптимальным, и мне кажется, что я работаю против фреймворка
Ответ №1:
Сага — это замок.
Как упоминает @Hadi, NServiceBus будет использовать оптимистичный параллелизм, чтобы гарантировать, что только одно сообщение получает обновление экземпляра saga за раз.
Вместо того чтобы выполнять обновление непосредственно в saga, сохраните тот факт, что вы выполняете обновление, и отправьте сообщение для выполнения удаленного вызова службы в отдельный обработчик сообщений в другой конечной точке. Сохранение факта в саге и отправка сообщения для его выполнения либо будут выполнены ОДНОВРЕМЕННО, либо не будут выполнены вообще. Если два сообщения попытаются сделать это одновременно, только одно завершится успешно. Другое сообщение получит исключение параллелизма, вернется в очередь и в конечном итоге будет повторено.
В это время он увидит, что уже выполняется операция обновления количества. Затем вы можете либо отменить второе сообщение, либо сохранить некоторое состояние в saga, чтобы убедиться, что второе обновление количества произойдет после завершения первого.
Перемещение вызова удаленной службы за пределы saga вместе с полнодуплексной передачей запросов / ответов обеспечивает хорошее разделение задач между saga как менеджером процесса и обработчиком сообщений как точкой интеграции.
Псевдокод
public class MySaga
{
public void Handle(MyMessage message)
{
if(Data.CurrentlyUpdatingQuantity)
return; //or schedule for later
Data.CurrentlyUpdatingQuantity = true;
Bus.Send(new PerformQuantityUpdateMessage(message.OrderId));
}
public void Handle(QuantityUpdateResponse message)
{
Data.CurrentlyUpdatingQuantity = false;
Bus.Send(new MyOtherMessage());
}
}
Отдельный обработчик сообщений (НЕ ЧАСТЬ САГИ)
public void Handle(PerformQuantityUpdateMessage message)
{
// call remote service to get current order quantity
// do some logic and update remote service with difference between original and new quantity
Bus.Reply(new QuantityUpdateResponse(message.OrderId));
}
Комментарии:
1. Должен ли я использовать отложенную доставку для изменения расписания сообщения или отслеживать его самостоятельно и проверять в ручном устройстве?
2. В этом случае я бы, вероятно, отследил это, сохранив операцию «Мне нужно выполнить другое количество обновлений» в данных saga. Затем первым делом проверьте этот флаг
Handle(QuantityUpdateResponse)
и при необходимости отправьте другойPerformQuantityUpdateMessage
. Таким образом, вы можете объединить несколько дополнительныхMyMessage
операций в одну операцию обновления количества и быть уверенным, что она будет выполнена, как только завершится начальная.3. Разве сообщения, которые появляются в то же время, не смогут обновлять данные saga, предполагая, что они относятся к одному и тому же уникальному ключу
4. Это должно работать нормально. Пока идентификатор заказа один и тот же, он всегда будет сопоставляться с одним и тем же экземпляром saga. (По крайней мере, пока вы явно не завершите сагу)
Ответ №2:
Вероятно, вам не следует делать это в своей саге:
// вызовите удаленную службу, чтобы узнать текущее количество заказа
Вместо этого это должно быть перенесено в отдельную конечную точку, с которой saga, как диспетчер процессов, взаимодействует в режиме полнодуплексного обмена сообщениями типа запрос / ответ.
Итак, когда saga получает это первое триггерное сообщение, которое запускает его, оно отправляет сообщение запроса в RemoteServiceInvocationEndpoint и обновляет свое состояние, чтобы указать, что оно ожидает ответа.
Конечная точка Remoteserviceinvocation после получения ответа от удаленной службы отправит ответное сообщение обратно в saga.
Когда saga получит это ответное сообщение, она будет знать, что процесс завершен, а затем выполнит любые необходимые конечные действия, такие как отправка других сообщений.
Если saga получает другое триггерное сообщение, она может проверить его состояние и увидеть, что она уже отправила сообщение с запросом и знает, что отправлять другое не следует.
Как сказал @Hadi в своем ответе, механизмы управления параллелизмом в NServiceBus гарантируют, что saga будет успешно обрабатывать только одно сообщение за раз.
Ответ №3:
Возможно ли создать вторичную сагу, поскольку обновление само по себе является длительным процессом? Когда цикл обновления завершает свою работу, он может сигнализировать о продолжении исходного цикла.
Что касается того, как NServiceBus обрабатывает параллельные саги, есть два случая:
-
При поступлении нескольких сообщений saga start будет зафиксировано только одно. Другие сообщения завершатся ошибкой и будут получены при повторных попытках. Со второй попытки сага уже существует, и второй экземпляр не создается. Это гарантирует, что будет создана только одна сага.
-
При одновременном доступе к saga (например, для обновления состояния) вступают в силу настройки параллелизма хранилища сохраняемости. При использовании RavenDB NServiceBus включает оптимистичную поддержку параллелизма.
Все это более подробно описано на этой странице документации NServiceBus.
Если вам нужно убедиться, что для каждого пакета существует только один экземпляр саги (например, если вы можете сопоставить сагу, скажем, с идентификатором продукта, который вы хотите заблокировать), вы можете использовать это в качестве идентификатора корреляции, чтобы в пакете существовала только одна сага.
Если вам нужен только один экземпляр saga (больше похожий на одноэлементный saga), вы можете использовать логику корреляции без операций, а также пользовательский saga finder. Таким образом, вы все еще можете масштабировать конечную точку, и другие обработчики / саги не пострадают. Этот метод показан здесь.