Сеанс подписки на служебную шину Azure (тема) не закрывается по запросу

#c# #azureservicebus #azure-servicebus-topics #azure-servicebus-subscriptions

#c# #azureservicebus #azure-servicebus-темы #azure-servicebus-подписки

Вопрос:

[редактировать: я переформулировал и упростил свой первоначальный вопрос]

Я использую тему / подписки на служебную шину Azure с сеансами, и у меня возникают проблемы с закрытием сеансов.

Для получения справочной информации мое приложение получает данные из сеанса подписки на тему (требование FIFO), который я поддерживаю большую часть времени. Только время от времени нам нужно на мгновение «приостановить» поток данных.

Когда запрашивается «пауза» в этом потоке данных, мы выходим из сеанса подписки и ждем запроса на повторное открытие сеанса.

 // pseudo code
public class Test
{
    public static async Task Me()
    {
        var client = new SubscriptionClient(
          EndPoint,
          Path,
          Name,
          TokenProvider,
          TransportType.Amqp,
          ReceiveMode.PeekLock,
          new RetryExponential(
            minimumBackoff: TimeSpan.FromSeconds(1),
            maximumBackoff: TimeSpan.FromSeconds(30),
            maximumRetryCount: 10));
            
        // Setup consumer options
        var sessionOptions = new SessionHandlerOptions(OnHandleExceptionReceived)
        {
            AutoComplete = false,
            MessageWaitTimeout = TimeSpan.FromSeconds(10),
            MaxConcurrentSessions = 1,
        };

        // Registration 1 - Start data flow
        client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);

        // Wait 1 - Artificially wait for 'data flow pause' to kick in.
        //          For the sake of this example, we artificially give plenty 
        //          of time to the message session handler to receive something
        //          and close the session.
        Task.Wait(TimeSpan.FromSeconds(30));

        // Registration 2 - Artificially 'unpause' data flow
        client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);

        // Wait 2 - Artificially wait for 'pause' to kick in again
        Task.Wait(TimeSpan.FromSeconds(30));

        // Finally close client
        await client.CloseAsync();
    }

    private static async Task OnMessageSessionAsync(IMessageSession session, Message message, CancellationToken cancellationToken)
    {
        try
        {
            await client.CompleteAsync(message.SystemProperties.LockToken);

            // Process message .. It doesn't matter what it is, 
            // just that at some point I want to break away from session
            if (bool.TryParse(message.UserProperties["SessionCompleted"] as string, out bool completed) amp;amp; completed)
                await session.CloseAsync(); // <-- This never works
        }
        catch (Exception e)
        {
            Console.WriteLine("OnMessageSessionAsync exception: {0}", e);

            // Indicates a problem, unlock message in subscription.
            await client.AbandonAsync(message.SystemProperties.LockToken);
        }
    }

    private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
    {
        var context = e.ExceptionReceivedContext;

        Options.Logger?.LogWarning(e.Exception, new StringBuilder()
            .AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
            .AppendLine("Exception context for troubleshooting:")
            .AppendLine($" - Endpoint: {context.Endpoint}")
            .AppendLine($" - Entity Path: {context.EntityPath}")
            .Append($" - Executing Action: {context.Action}"));

        return Task.CompletedTask;
    }
}
  

Вопросы :

Как указывалось ранее, у меня возникла проблема с выходом из сеанса, поскольку вызов session.CloseAsync() кажется неработоспособным. Сообщения продолжают появляться, хотя я явно попросил сеанс прекратить.

  • Это нормальное поведение, когда сеанс темы не может быть закрыт напрямую? Если да, то зачем вообще раскрывать вызов session.CloseAsync() ?
  • Могу ли я на самом деле закрыть сеанс независимо от подключения по подписке?

ps1: я основал свой код на официальном образце, доступном на github.com от Microsoft. И хотя этот пример основан на сеансе очереди, а не на сеансе темы, мне кажется логичным, что поведение должно быть идентичным.

ps2: я подробно описал, что может быть причиной в Microsoft.Azure. Репозиторий ServiceBus и мне интересно, отсутствует ли инициализация переменной под капотом MessageSession.OwnsConnection свойства..