#azure-servicebus-subscriptions
#azure-служебный автобус-подписки
Вопрос:
Более подробное описание: У меня есть один потребитель с MaxConcurrentCalls=40. Потребитель ServiceBusProcessor установит длительную задержку (60 секунд) при обработке одного сообщения после каждого сообщения NumMessagesProcessedBeforeDelay.
В зависимости от размера MaxPrefetchCount (5/10/15) обработка всех сообщений прекращается после (3/5/6) количества сообщений с длительной задержкой, соответственно.
Когда мои компьютерные часы достигнут 1 минуты после отображаемого времени, они снова начнут обработку. Это может продолжаться или прекращаться в зависимости от следующего показа.
Обработка ожиданий должна продолжаться для других (MaxConcurrentCalls — потоки с задержкой)
Дополнительный тест Когда я писал это, у меня возникла дополнительная мысль отключить предварительную выборку MaxPrefetchCount = 0, и, как ни странно, это не только не останавливает обработку, но и на самом деле кажется быстрее.
Я спрашиваю, есть ли у кого-нибудь какие-либо мысли о том, какова корреляция между количеством предварительной выборки и количеством отложенных сообщений, относительно того, почему обработка останавливается или почему в целом обработка сообщений останавливается, когда у меня есть 35 одновременных вызовов, доступных для запуска.
исходный код
Ниже приведен полный исходный код для воспроизведения проблемы. Вам нужно будет создать пространство имен ASB и обновить строку подключения. Все остальное будет создано для вас.
// // .NET Core 3.1 // Azure.Messaging.ServiceBus v7.5.0 // Service Bus Namespace Subscription: Development / Standard // Visual Studio 2019 - Version 16.10.3 // using System; using System.Collections.Concurrent; using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; namespace MyDemo { class Program { const string connectionString = "Endpoint=sb://...."; private readonly ServiceBusAdministrationClient adminClient; private readonly ServiceBusClient publisherClient; private readonly ServiceBusClient subscriberClient; private readonly ServiceBusSender publisher; private ServiceBusProcessor consumer; const string messagePipeName = "MyTestTopic"; // // Message Creation // const int NumOfMessages = 10_000; const int NumOfMessagesPerLoop = 100; // // Message Processing // const int MaxConcurrentCalls = 40; const int NumMessagesProcessedBeforeDelay = 150; // 230 = threshold of continuous run MCC = 40 MPC = 15 //const int MaxPrefetchCount = 0; const int lineBreak = 5; //const int MaxPrefetchCount = 5; const int lineBreak = 3; //const int MaxPrefetchCount = 10; const int lineBreak = 5; const int MaxPrefetchCount = 15; const int lineBreak = 6; static async Task Main(string[] args) { // // Setup // var program = new Program(); await program.CreateMessagePipe(); await program.CreateSubscriber(); Console.WriteLine("Press Enter To Continue..."); // // Send Messages // await Task.Delay(5_000); // wait for Subscriber to be ready before sending messages await program.PublishMessages(); Console.ReadLine(); } public Program() { adminClient = new ServiceBusAdministrationClient(connectionString); publisherClient = new ServiceBusClient(connectionString); publisher = publisherClient.CreateSender(messagePipeName); var options = new ServiceBusReceiverOptions(); subscriberClient = new ServiceBusClient(connectionString); subscriberClient.CreateReceiver(messagePipeName, messagePipeName, options); } // // Message Pipe // public async Task CreateMessagePipe() { if (await MessagePipeExists(messagePipeName)) return; var options = new CreateTopicOptions(messagePipeName); await adminClient.CreateTopicAsync(options); } public async Tasklt;boolgt; MessagePipeExists(string messagePipeName) { var response = await adminClient.TopicExistsAsync(messagePipeName); return response.Value; } // // Publisher // public async Task PublishMessages() { var options = new ParallelOptions() { MaxDegreeOfParallelism = 10 }; int numLoops = (NumOfMessages / NumOfMessagesPerLoop); for (int loop = 0; loop lt; numLoops; loop ) { var tasks = new ConcurrentBaglt;Taskgt;(); Parallel.For(0, NumOfMessagesPerLoop, options, (i) =gt; { tasks.Add(SendMessage()); }); await Task.WhenAll(tasks); } } int counter = 0; public Task SendMessage() { return Task.Run(async () =gt; { var localCounter = Interlocked.Increment(ref counter); var text = $"Some Random Message: {localCounter:D8}"; var message = new ServiceBusMessage(text); await publisher.SendMessageAsync(message); }); } // // Subscriber // public async Task CreateSubscriber() { if (!await SubscriptionExists()) await CreateSubscription(); await CreateConsumer(); } public async Tasklt;boolgt; SubscriptionExists() { var response = await adminClient.SubscriptionExistsAsync(messagePipeName, messagePipeName); return response.Value; } private async Task CreateSubscription() { CreateSubscriptionOptions options = new CreateSubscriptionOptions(messagePipeName, messagePipeName) { AutoDeleteOnIdle = TimeSpan.MaxValue, DeadLetteringOnMessageExpiration = true, MaxDeliveryCount = 3 }; await adminClient.CreateSubscriptionAsync(options); } public Task CreateConsumer() { var options = new ServiceBusProcessorOptions { AutoCompleteMessages = false, MaxConcurrentCalls = MaxConcurrentCalls, PrefetchCount = MaxPrefetchCount, ReceiveMode = ServiceBusReceiveMode.PeekLock }; consumer = subscriberClient.CreateProcessor(messagePipeName, messagePipeName, options); consumer.ProcessMessageAsync = MessageHandler; consumer.ProcessErrorAsync = ErrorHandler; return consumer.StartProcessingAsync(); } private async Task MessageHandler(ProcessMessageEventArgs args) { string message = args.Message.Body.ToString(); var result = await Handler(message); try { if (result) await args.CompleteMessageAsync(args.Message); else await args.AbandonMessageAsync(args.Message); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessageLockLost) { throw; } } int handlerCount = 0; int innerHandlerCount = 0; private async Tasklt;boolgt; Handler(string message) { await Task.Delay(100); await Task.Delay(200); var localCount = Interlocked.Increment(ref handlerCount); // 265 - Just at threshold if (localCount % NumMessagesProcessedBeforeDelay == 0) { // Every (N)umber of records similate a long task to test ServiceBusFailureReason.MessageLockLost // Print time then wait var localInnerCount = Interlocked.Increment(ref innerHandlerCount); if (localInnerCount % lineBreak == 0) Console.WriteLine("n" DateTime.Now.ToString("HH:mm:ss") "nn"); else Console.WriteLine("n" DateTime.Now.ToString("HH:mm:ss")); for (int i = 0; i lt; 60; i ) await Task.Delay(1_000); } else Console.Write("."); return true; } private Task ErrorHandler(ProcessErrorEventArgs args) { var sb = new StringBuilder(1000) .AppendLine("=-=-=-=-=-=-=-=-=-") .AppendLine($"ErrorSource: {args.ErrorSource}") .AppendLine($"FullyQualifiedNamespace: {args.FullyQualifiedNamespace}") .AppendLine($"EntityPath: {args.EntityPath}") .AppendLine($"Exception: {args.Exception}") .AppendLine("=-=-=-=-=-=-=-=-=-"); Console.WriteLine(sb.ToString()); return Task.CompletedTask; } } }