Как зарегистрироваться для получения сообщений сеанса служебной шины, которые имеют только определенный идентификатор сеанса?

#session #servicebus #azure-sdk-.net #azure-servicebus-topics #azure-servicebus-subscriptions

#сеанс #служебная шина #azure-sdk-.net #azure-servicebus-темы #azure-servicebus-подписки

Вопрос:

  1. Я использую .net Core 3.1 и Microsoft.Лазурный.Служебная шина (версия 5.1.3).
  2. У меня есть раздел служебной шины с одной подпиской, который может обрабатывать только сообщения сеанса.

Клиент темы может отправить 3 сообщения с идентификатором сеанса (скажем, ABCD), а затем отправляет еще 4 сообщения с идентификатором сеанса (XYZ). Довольно легко написать клиент подписки, чтобы получать все 7 сообщений с соответствующими идентификаторами сеанса. Однако я хочу иметь возможность получать сообщения только с идентификатором сеанса XYZ (и не забочусь о сообщениях с идентификатором сеанса ABCD и даже не хочу их получать).

Следующий пример кода для получения всех сообщений со всеми идентификаторами сеанса работает так, как ожидалось:

 static async Task Main(string[] args)
{
    try
    {
        byte[] messageBody = System.Text.Encoding.Unicode.GetBytes("Hello, world!");
        ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);

        SubscriptionClient client = new SubscriptionClient(builder, subscriptionName, ReceiveMode.PeekLock);

        var sessionHandler = new SessionHandlerOptions(ExceptionHandler);
        sessionHandler.AutoComplete = true;
        client.RegisterSessionHandler(SessionMessageHandler, sessionHandler);

        Console.WriteLine("Press any key to exit!");
        Console.ReadKey();

        await client.CloseAsync();
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }        
}

static Task SessionMessageHandler(IMessageSession session, Message message, CancellationToken cancellationToken)
{
    var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
    Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
    return Task.CompletedTask;
}

static Task ExceptionHandler(ExceptionReceivedEventArgs args)
{
    var context = args.ExceptionReceivedContext;
    Console.WriteLine($"Exception context: {context.Action}, {context.ClientId}, {context.Endpoint}, {context.EntityPath}");
    Console.WriteLine($"Exception: {args.Exception}");
    return Task.CompletedTask;
}
  

Вопросы:

  1. Как мне изменить приведенный выше код, чтобы я получал сообщения только с идентификатором сеанса XYZ (без получения сообщений с идентификатором сеанса ABCD)?
  2. Если это невозможно с помощью приведенного выше кода, есть ли какой-либо другой способ достичь того, чего я хочу (с той же библиотекой)? Если да, пожалуйста, приведите примеры.

Ответ №1:

Приведенный выше код использует обработчик сеанса, который предназначен для обработки нескольких сеансов, а не только одного сеанса. Если вы хотите обработать один сеанс только с определенным идентификатором, вам нужно будет использовать SessionClient и его AcceptMessageSessionAsync(String) метод, который принимает идентификатор сеанса в качестве параметра.

Ответ №2:

Используя совет Шона, я изменил код на следующий, и он работает. Спасибо, Шон.

     static async Task Main(string[] args)
    {
        try
        {
            Console.WriteLine("Enter session id to listen on ...");
            var sessionId = Console.ReadLine();
            if (sessionId == string.Empty)
            {
                sessionId = "12345";
            }
            Console.WriteLine($"Reading messages with session id: {sessionId}");

            var sessionClient = new SessionClient(connectionStringWithoutEntityPath, subscriberPath, ReceiveMode.PeekLock);

            var messageSession = await sessionClient.AcceptMessageSessionAsync(sessionId);

            if (messageSession != null)
            {
                while(true)
                {
                    Message message = await messageSession.ReceiveAsync();

                    if (message != null)
                    {
                        var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
                        Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
                        await messageSession.CompleteAsync(message.SystemProperties.LockToken);
                    }
                    else
                    {
                        Console.WriteLine("Press Enter to keep reading. Otherwise press any other key to exit.");
                        if (Console.ReadLine() != string.Empty)
                        {
                            break;
                        }
                    }
                }
            }

            await sessionClient.CloseAsync();
            Console.WriteLine("Press any key to exit!");
            Console.ReadKey();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }        
    }