Лазурь.Обмен сообщениями.ServiceBus — ProcessMessageAsync прекращает обработку, а затем возобновляется через некоторое время

#azure-servicebus-subscriptions

#azure-служебный автобус-подписки

Вопрос:

Более подробное описание: У меня есть один потребитель с MaxConcurrentCalls=40. Потребитель ServiceBusProcessor установит длительную задержку (60 секунд) при обработке одного сообщения после каждого сообщения NumMessagesProcessedBeforeDelay.

В зависимости от размера MaxPrefetchCount (5/10/15) обработка всех сообщений прекращается после (3/5/6) количества сообщений с длительной задержкой, соответственно.

Когда мои компьютерные часы достигнут 1 минуты после отображаемого времени, они снова начнут обработку. Это может продолжаться или прекращаться в зависимости от следующего показа.

Обработка ожиданий должна продолжаться для других (MaxConcurrentCalls — потоки с задержкой)

Дополнительный тест Когда я писал это, у меня возникла дополнительная мысль отключить предварительную выборку MaxPrefetchCount = 0, и, как ни странно, это не только не останавливает обработку, но и на самом деле кажется быстрее.

Я спрашиваю, есть ли у кого-нибудь какие-либо мысли о том, какова корреляция между количеством предварительной выборки и количеством отложенных сообщений, относительно того, почему обработка останавливается или почему в целом обработка сообщений останавливается, когда у меня есть 35 одновременных вызовов, доступных для запуска.

исходный код

Ниже приведен полный исходный код для воспроизведения проблемы. Вам нужно будет создать пространство имен ASB и обновить строку подключения. Все остальное будет создано для вас.

 // // .NET Core 3.1 // Azure.Messaging.ServiceBus v7.5.0 // Service Bus Namespace Subscription: Development / Standard // Visual Studio 2019 - Version 16.10.3 //  using System; using System.Collections.Concurrent; using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration;   namespace MyDemo {  class Program  {  const string connectionString = "Endpoint=sb://....";   private readonly ServiceBusAdministrationClient adminClient;  private readonly ServiceBusClient publisherClient;  private readonly ServiceBusClient subscriberClient;   private readonly ServiceBusSender publisher;  private ServiceBusProcessor consumer;   const string messagePipeName = "MyTestTopic";    //  // Message Creation  //  const int NumOfMessages = 10_000;  const int NumOfMessagesPerLoop = 100;   //  // Message Processing  //  const int MaxConcurrentCalls = 40;  const int NumMessagesProcessedBeforeDelay = 150; // 230 = threshold of continuous run MCC = 40 MPC = 15    //const int MaxPrefetchCount = 0; const int lineBreak = 5;  //const int MaxPrefetchCount = 5; const int lineBreak = 3;  //const int MaxPrefetchCount = 10; const int lineBreak = 5;  const int MaxPrefetchCount = 15; const int lineBreak = 6;    static async Task Main(string[] args)  {  //  // Setup  //  var program = new Program();  await program.CreateMessagePipe();  await program.CreateSubscriber();    Console.WriteLine("Press Enter To Continue...");   //  // Send Messages  //  await Task.Delay(5_000); // wait for Subscriber to be ready before sending messages  await program.PublishMessages();   Console.ReadLine();  }   public Program()  {  adminClient = new ServiceBusAdministrationClient(connectionString);   publisherClient = new ServiceBusClient(connectionString);  publisher = publisherClient.CreateSender(messagePipeName);    var options = new ServiceBusReceiverOptions();  subscriberClient = new ServiceBusClient(connectionString);  subscriberClient.CreateReceiver(messagePipeName, messagePipeName, options);  }    //  // Message Pipe  //   public async Task CreateMessagePipe()  {  if (await MessagePipeExists(messagePipeName))  return;   var options = new CreateTopicOptions(messagePipeName);   await adminClient.CreateTopicAsync(options);  }   public async Tasklt;boolgt; MessagePipeExists(string messagePipeName)  {  var response = await adminClient.TopicExistsAsync(messagePipeName);  return response.Value;  }    //  // Publisher  //   public async Task PublishMessages()  {  var options = new ParallelOptions() { MaxDegreeOfParallelism = 10 };   int numLoops = (NumOfMessages / NumOfMessagesPerLoop);   for (int loop = 0; loop lt; numLoops; loop  )  {  var tasks = new ConcurrentBaglt;Taskgt;();   Parallel.For(0, NumOfMessagesPerLoop, options, (i) =gt;  {  tasks.Add(SendMessage());  });   await Task.WhenAll(tasks);  }  }   int counter = 0;  public Task SendMessage()  {  return Task.Run(async () =gt;  {  var localCounter = Interlocked.Increment(ref counter);   var text = $"Some Random Message: {localCounter:D8}";   var message = new ServiceBusMessage(text);  await publisher.SendMessageAsync(message);  });  }    //  // Subscriber  //   public async Task CreateSubscriber()  {  if (!await SubscriptionExists())  await CreateSubscription();   await CreateConsumer();  }   public async Tasklt;boolgt; SubscriptionExists()  {  var response = await adminClient.SubscriptionExistsAsync(messagePipeName, messagePipeName);  return response.Value;  }   private async Task CreateSubscription()  {  CreateSubscriptionOptions options = new CreateSubscriptionOptions(messagePipeName, messagePipeName)  {  AutoDeleteOnIdle = TimeSpan.MaxValue,  DeadLetteringOnMessageExpiration = true,  MaxDeliveryCount = 3  };   await adminClient.CreateSubscriptionAsync(options);  }   public Task CreateConsumer()  {  var options = new ServiceBusProcessorOptions  {  AutoCompleteMessages = false,  MaxConcurrentCalls = MaxConcurrentCalls,  PrefetchCount = MaxPrefetchCount,  ReceiveMode = ServiceBusReceiveMode.PeekLock  };   consumer = subscriberClient.CreateProcessor(messagePipeName, messagePipeName, options);  consumer.ProcessMessageAsync  = MessageHandler;  consumer.ProcessErrorAsync  = ErrorHandler;  return consumer.StartProcessingAsync();  }   private async Task MessageHandler(ProcessMessageEventArgs args) {  string message = args.Message.Body.ToString();  var result = await Handler(message);   try  {  if (result)  await args.CompleteMessageAsync(args.Message);  else  await args.AbandonMessageAsync(args.Message);  }  catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageLockLost)  {  throw;  }  }   int handlerCount = 0;  int innerHandlerCount = 0;   private async Tasklt;boolgt; Handler(string message)  {  await Task.Delay(100);  await Task.Delay(200);    var localCount = Interlocked.Increment(ref handlerCount);   // 265 - Just at threshold   if (localCount % NumMessagesProcessedBeforeDelay == 0)  {  // Every (N)umber of records similate a long task to test ServiceBusFailureReason.MessageLockLost  // Print time then wait   var localInnerCount = Interlocked.Increment(ref innerHandlerCount);  if (localInnerCount % lineBreak == 0)  Console.WriteLine("n"   DateTime.Now.ToString("HH:mm:ss")   "nn");  else  Console.WriteLine("n"   DateTime.Now.ToString("HH:mm:ss"));   for (int i = 0; i lt; 60; i  )  await Task.Delay(1_000);  }  else  Console.Write(".");   return true;  }    private Task ErrorHandler(ProcessErrorEventArgs args)  {  var sb = new StringBuilder(1000)  .AppendLine("=-=-=-=-=-=-=-=-=-")  .AppendLine($"ErrorSource: {args.ErrorSource}")  .AppendLine($"FullyQualifiedNamespace: {args.FullyQualifiedNamespace}")  .AppendLine($"EntityPath: {args.EntityPath}")  .AppendLine($"Exception: {args.Exception}")  .AppendLine("=-=-=-=-=-=-=-=-=-");   Console.WriteLine(sb.ToString());   return Task.CompletedTask;  }  } }