#azure #function #service #triggers #bus
Вопрос:
Я использую привязку триггера очереди служебной шины функций Azure, чтобы повторить свою службу, если возникнет какое-либо исключение.
У меня есть следующее в host.json
"extensions": {
"serviceBus": {
"messageHandlerOptions": {
"autoComplete": false
}
}
}
Моя ссылка на пакет
<ItemGroup>
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.15.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.5.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="3.1.20" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.13" />
</ItemGroup>
Но продолжайте получать
Майкрософт.Лазурь.Служебная шина: Предоставленная блокировка недействительна. Либо срок действия блокировки истек, либо сообщение уже было удалено из очереди, либо было получено другим экземпляром получателя.
Мой метод функции Azure
public async Task Run([ServiceBusTrigger("subprocess", Connection = "sb")] Message message,
MessageReceiver messageReceiver,
[ServiceBus("subprocess", Connection = "sb")] MessageSender sender,
ILogger log)
Вот полный код
public class BozaSubProcessServiceBusQueueTrigger
{
private const string RetryCountUserProperty = "retry-count";
private const string SubProcessException = "SubProcessException";
private const string OriginalSequenceNumber = "original-SequenceNumber";
private static readonly int RetryCount = 5;
private readonly BozaSubProcessFactory accountTransferProcessFactory;
public BozaSubProcessServiceBusQueueTrigger(BozaSubProcessFactory bozaProcessFactory)
{
this.BozaProcessFactory = bozaProcessFactory;
}
[FunctionName("BozaSubProcessServiceBusQueueTrigger")]
public async Task Run([ServiceBusTrigger("bozasubprocess", Connection = "sb")] Message message,
MessageReceiver messageReceiver,
[ServiceBus("bozasubprocess", Connection = "sb")] MessageSender sender,
ILogger log)
{
try
{
string body = Encoding.UTF8.GetString(message.Body);
//log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
if (this.accountTransferProcessFactory.GetBozaProcessService(body) != null)
{
log.LogInformation($"Boza Sub Process Invoked");
await this.accountTransferProcessFactory.GetBozaProcessService(body).ExecuteAsync(body);
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}
else
{
log.LogInformation("No any Boza Sub Process Invoked");
}
}
catch (Exception ex)
{
if (!message.UserProperties.ContainsKey(RetryCountUserProperty))
{
message.UserProperties[RetryCountUserProperty] = 0;
message.UserProperties[OriginalSequenceNumber] = message.SystemProperties.SequenceNumber;
}
if ((int)message.UserProperties[RetryCountUserProperty] < RetryCount)
{
Message retryMessage = message.Clone();
int retryCount = (int)message.UserProperties[RetryCountUserProperty] 1;
int interval = 5 * retryCount;
DateTimeOffset scheduleTime = DateTimeOffset.Now.AddSeconds(interval);
retryMessage.UserProperties[RetryCountUserProperty] = retryCount;
retryMessage.UserProperties[SubProcessException] = ex.Message;
await sender.ScheduleMessageAsync(retryMessage, scheduleTime);
await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
log.LogInformation($"Scheduling message retry {retryCount} to wait {interval} seconds and arrive at {scheduleTime.UtcDateTime}");
}
else
{
log.LogWarning(message.SystemProperties.LockToken);
log.LogCritical($"Exhausted all retries for message sequence {message.UserProperties["original-SequenceNumber"]}");
await messageReceiver.DeadLetterAsync(message.SystemProperties.LockToken, "Exhausted all retries");
}
}
}
}
Комментарии:
1. у вас есть try catch внутри вашего функционального кода? Может ли также включать в себя код функции?
2. Привет @ThiagoCustodio Я прикрепил код выше. Спасибо
Ответ №1:
Поскольку это исключение возникает как часть расширения служебной шины, вы не можете напрямую обработать ошибку (например, попробовать/поймать), но можете настроить политики повторных попыток (поверх повторной попытки служебной шины по умолчанию). Этот официальный документ более подробно описывает это.
Вместо того, чтобы выполнять повторную попытку в инструкции catch, как вы делаете, попробуйте использовать те, которые предоставляются SDK, которые, как я полагаю, обновляют блокировку:
Комментарии:
1. Привет @Thiago Custodio можете ли вы обработать событие до того, как сообщение попадет в очередь с мертвой строкой с помощью FixedDelayRetry??
2. это хороший вопрос. Я в этом не уверен. Вам лучше спросить об этом в репозитории azure service bus SDK на Github
3. Привет @Тьяго Кастодио, я добавил следующее [Исправлена ошибка(5, «00:00:10»)] но после 5 повторных попыток сообщение больше никогда не попадает в очередь мертвых писем. 😒