Предложения по реализации блока сеанса для служебной шины

#azure #.net-core #messaging #servicebus #azure-servicebus-queues

#azure #.net-ядро #обмен сообщениями #servicebus #azure-servicebus-очереди

Вопрос:

В настоящее время у нас есть приложение dotnet core с фоновой службой, которое получает сообщение от служебной шины с включенным сеансом, где sessionId есть userId , и сообщение содержит обновления информации о пользователе. Теперь мы хотели бы реализовать функцию для временной приостановки обновлений для определенного пользователя путем блокировки определенного userId/sessionId , но при этом обрабатывать сообщения по порядку при разблокировке. Каков наилучший способ решения этой проблемы?

Я попытался просмотреть документацию и образцы служебной шины. В основном, отсрочка сообщения, состояние сеанса сообщения и пример состояния сеанса

И я нашел некоторую информацию о SessionState и отсрочке сообщений, и мне интересно, можно ли их использовать для реализации этой функции и по-прежнему гарантировать порядок обработки (FIFO независимо от того, было ли отложено сообщение). Я думал о попытке сохранить порядковый номер в состоянии сеанса и продолжать получать отложенные сообщения через этот номер и увеличивать его для получения следующего сообщения, пока у меня не закончатся сообщения?

В настоящее время наш код выглядит примерно так:

             this.queue.RegisterSessionHandler(
                this.SessionHandler,
                new SessionHandlerOptions(this.ExceptionHandler)
                {
                    AutoComplete = false,
                    MessageWaitTimeout = TimeSpan.FromMinutes(1),
                });

  

Где this.SessionHandler — функция, которая обрабатывает сообщение, затем завершает и закрывает сеанс, вызывая session.CompleteAsync и session.CloseAsync . Однако у меня возникли проблемы с концептуализацией того, как добавить логику отсрочки в наш код. Потому что в настоящее время RegisterSessionHandler уже обрабатывает блокировки сеанса и выполняет балансировку нагрузки на сообщения с sessionId помощью (я полагаю), что здорово. Но RegisterSessionHandler также не позволяет указать конкретный sessionId процесс.

Допустим, у меня отложено несколько сообщений userId/sessionId: A . И когда я хочу разблокировать обработку для этого пользователя, я не могу просто вставить отложенное сообщение обратно в очередь. Поскольку отправитель все равно будет постоянно отправлять сообщения для пользователя A в очередь, и это нарушит порядок.

Пример состояния сеанса, о котором я упоминал выше, содержит хороший пример использования состояния сеанса и обработки отложенных сообщений. Однако он использует только один sessionId и не использует RegisterSessionHandler . Мой вопрос: если мы хотим реализовать логику отложенной обработки сообщений (с сохранением порядка), должны ли мы реализовать нашу собственную RegisterSessionHandler и иметь дело с sessionId балансировкой нагрузки?

Заранее спасибо!

Комментарии:

1. Из вопроса не совсем ясно, с какой проблемой вы столкнулись с опубликованным вами кодом? Кроме того, когда вы говорите: «Но я не мог понять, как они работают из документации.», укажите ссылку на документ, и что именно непонятно? По сценарию вы находитесь в правильном направлении.

2. @krishg Извините, я раньше не очень ясно выразился. Я обновил свой вопрос в соответствии с вашими предложениями. Пожалуйста, дайте мне знать, если вы обнаружите что-то еще неясное. Спасибо!

Ответ №1:

Вы должны использовать SessionClient вместо использования RegisterSessionHandler в QueueClient, чтобы лучше обрабатывать сценарий отсрочки и сохранять порядок. Вы можете сохранить некоторый номер шага / последовательности в теле вашего сообщения. Также добавьте LastProcessedStep / Seqence, когда вы фактически обрабатываете сообщение. Состояние сеанса позволяет отслеживать состояние обработки, связанное обработчиком с сеансом, чтобы клиенты могли гибко переключаться между обрабатывающими узлами (включая переход на другой ресурс) во время обработки сеанса. Образец обрабатывает отложенное сообщение, сохраняя его (Шаг). Он сочетает в себе функции отсрочки и сеанса, так что средство состояния сеанса используется для отслеживания состояния обработки рабочего процесса, когда входные данные для соответствующих шагов поступают не в ожидаемом порядке. Обратите внимание также на код отправителя, который демонстрирует, что, отправляя сообщения в непредсказуемом порядке, но в силу состояния сеанса, получатель определяет порядок.

 //   
//   Copyright © Microsoft Corporation, All Rights Reserved
// 
//   Licensed under the Apache License, Version 2.0 (the "License"); 
//   you may not use this file except in compliance with the License. 
//   You may obtain a copy of the License at
// 
//   http://www.apache.org/licenses/LICENSE-2.0 
// 
//   THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS
//   OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION
//   ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A
//   PARTICULAR PURPOSE, MERCHANTABILITY OR NON-INFRINGEMENT.
// 
//   See the Apache License, Version 2.0 for the specific language
//   governing permissions and limitations under the License. 

namespace SessionState
{
    using Microsoft.Azure.ServiceBus;
    using Microsoft.Azure.ServiceBus.Core;
    using Newtonsoft.Json;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    public class Program : MessagingSamples.Sample
    {
        public async Task Run(string connectionString)
        {
            Console.WriteLine("Press any key to exit the scenario");

            var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
            var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName);
            var receiveTask = this.ReceiveMessagesAsync(connectionString, SessionQueueName);

            await Task.WhenAll(sendTask, sendTask2, receiveTask);
        }

        async Task SendMessagesAsync(string session, string connectionString, string queueName)
        {
            var sender = new MessageSender(connectionString, queueName);


            Console.WriteLine("Sending messages to Queue...");

            ProcessingState[] data = new[]
            {
                new ProcessingState {Step = 1, Title = "Buy"},
                new ProcessingState {Step = 2, Title = "Unpack"},
                new ProcessingState {Step = 3, Title = "Prepare"},
                new ProcessingState {Step = 4, Title = "Cook"},
                new ProcessingState {Step = 5, Title = "Eat"},
            };

            var rnd = new Random();
            var tasks = new List<Task>();
            for (int i = 0; i < data.Length; i  )
            {
                var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                {
                    SessionId = session,
                    ContentType = "application/json",
                    Label = "RecipeStep",
                    MessageId = i.ToString(),
                    TimeToLive = TimeSpan.FromMinutes(2)
                };

                tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
                      async (t) =>
                      {
                          await sender.SendAsync(message);
                          lock (Console.Out)
                          {
                              Console.ForegroundColor = ConsoleColor.Yellow;
                              Console.WriteLine("Message sent: Id = {0}", message.MessageId);
                              Console.ResetColor();
                          }
                      }));
            }
            await Task.WhenAll(tasks);
        }

        async Task ReceiveMessagesAsync(string connectionString, string queueName)
        {
            var client = new SessionClient(connectionString, queueName, ReceiveMode.PeekLock);

            while (true)
            {
                var session = await client.AcceptMessageSessionAsync();
                await Task.Run(
                    async () =>
                    {
                        ProcessingState processingState;

                        var stateData = await session.GetStateAsync();
                        if (stateData != null)
                        {
                            processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
                        }
                        else
                        {
                            processingState = new ProcessingState
                            {
                                LastProcessedRecipeStep = 0,
                                DeferredSteps = new Dictionary<int, long>()
                            };
                        }

                        while (true)
                        {
                            try
                            {
                                //receive messages from Queue
                                var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
                                if (message != null)
                                {
                                    if (message.Label != null amp;amp;
                                        message.ContentType != null amp;amp;
                                        message.Label.Equals("RecipeStep", StringComparison.InvariantCultureIgnoreCase) amp;amp;
                                        message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
                                    {
                                        var body = message.Body;

                                        ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                        if (recipeStep.Step == processingState.LastProcessedRecipeStep   1)
                                        {
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "ttttMessage received: nttttttMessageId = {0}, nttttttSequenceNumber = {1}, nttttttEnqueuedTimeUtc = {2},"  
                                                    "nttttttExpiresAtUtc = {5}, nttttttContentType = "{3}", nttttttSize = {4},  nttttttContent: [ step = {6}, title = {7} ]",
                                                    message.MessageId,
                                                    message.SystemProperties.SequenceNumber,
                                                    message.SystemProperties.EnqueuedTimeUtc,
                                                    message.ContentType,
                                                    message.Size,
                                                    message.ExpiresAtUtc,
                                                    recipeStep.Step,
                                                    recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(message.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = recipeStep.Step;
                                            await
                                                session.SetStateAsync(
                                                    Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                        else
                                        {
// in your case, if customer update is blocked, you can defer
                                            processingState.DeferredSteps.Add((int)recipeStep.Step, (long)message.SystemProperties.SequenceNumber);
                                            await session.DeferAsync(message.SystemProperties.LockToken);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    else
                                    {
                                        await session.DeadLetterAsync(message.SystemProperties.LockToken);//, "ProcessingError", "Don't know what to do with this message");
                                    }
                                }
                                else
                                {
                                    while (processingState.DeferredSteps.Count > 0)
                                    {
                                        long step;

                                        if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep   1, out step))
                                        {
                                            var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
                                            var body = deferredMessage.Body;
                                            ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "ttttdeferredMessage received: nttttttMessageId = {0}, nttttttSequenceNumber = {1}, nttttttEnqueuedTimeUtc = {2},"  
                                                    "nttttttExpiresAtUtc = {5}, nttttttContentType = "{3}", nttttttSize = {4},  nttttttContent: [ step = {6}, title = {7} ]",
                                                    deferredMessage.MessageId,
                                                    deferredMessage.SystemProperties.SequenceNumber,
                                                    deferredMessage.SystemProperties.EnqueuedTimeUtc,
                                                    deferredMessage.ContentType,
                                                    deferredMessage.Size,
                                                    deferredMessage.ExpiresAtUtc,
                                                    recipeStep.Step,
                                                    recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep   1;
                                            processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    break;
                                }
                            }
                            catch (ServiceBusException e)
                            {
                                if (!e.IsTransient)
                                {
                                    Console.WriteLine(e.Message);
                                    throw;
                                }
                            }
                        }
                        await session.CloseAsync();
                    });
            }
        }

       public static int Main(string[] args)
        {
            try
            {
                var app = new Program();
                app.RunSample(args, app.Run);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                return 1;
            }
            return 0;
        }

        class ProcessingState
        {
            [JsonProperty]
            public int LastProcessedRecipeStep { get; set; }
            [JsonProperty]
            public Dictionary<int, long> DeferredSteps { get; set; }
            [JsonProperty]
            public int Step { get; internal set; }
            [JsonProperty]
            public string Title { get; internal set; }
        }
    }
}
  

Вы также можете следить за сообщениями о заказе в служебной шине Azure, в которых очень хорошо кратко объясняется концепция. Но приведенный там пример немного отличается от приведенного выше.

ПРЕДУПРЕЖДЕНИЕ: использование сеанса сообщений также означает, что для сеанса (в вашем случае идентификатор пользователя) сообщения в этом сеансе всегда будут приниматься и обрабатываться одним получателем. Поэтому будьте внимательны при настройке сеанса и идентификатора сеанса. Если вы создадите очень большой сеанс, это заставит служебную шину Azure отправлять большинство сообщений одному подписчику, уменьшая преимущества многопоточности. Если вы задаете сеансы слишком детализированными, это теряет свою предполагаемую выгоду, и вы просто добавляете ненужные накладные расходы.

Комментарии:

1. Спасибо за ваш ответ! Однако приведенный здесь пример демонстрирует обработку сообщений только с одним идентификатором сеанса. Я видел используемый код SessionClient(connectionString, queueName, ReceiveMode.PeekLock); , и client.AcceptMessageSessionAsync(); если я хочу иметь фоновую задачу для одновременной обработки нескольких сеансов при включенной блокировке сеанса (чтобы одновременно не обрабатывались два сообщения из одного сеанса), будет ли у меня несколько сеансовых клиентов и чтобы все они вызывались AcceptMessageSessionAsync в цикле for? Спасибо.

2. Он также будет работать с несколькими идентификаторами сеанса. Вы можете протестировать, обновив логику на стороне отправителя. Приемник представляет собой непрерывный цикл, в котором он ожидает сеанса AcceptMessageSessionAsync ., и для каждого сеанса он запускает задачу, которая, в свою очередь, выполняет цикл. Сталкивались ли вы с какой-либо проблемой во время запуска? Где вы видите идентификатор сеанса, переданный AcceptMessageSessionAsync методу?

3. Извините, если я не был ясен. Я имел в виду, что если мы хотим одновременно обрабатывать несколько сообщений (с разными идентификаторами сеанса), должны ли мы также создавать несколько сеансовых клиентов? Спасибо!

4. Нет, вы этого не делаете. Для вашего удобства я только что обновил метод Run выше, чтобы отправлять другой набор сообщений с другим идентификатором сеанса. Но приемник остается прежним. Он уже запускает задачу для каждого сеанса (во внешнем блоке `while (true)’). Я бы предложил запустить его для отладки для лучшего понимания. Это всего лишь простое консольное приложение, которое вы можете запустить на своем локальном компьютере 🙂 github.com/Azure/azure-service-bus/tree/master/samples/DotNet /…

5. Вы также можете следить за сообщениями о заказе в служебной шине Azure , в которых очень хорошо кратко объясняется концепция. Но приведенный там пример немного отличается от приведенного выше.