#c# #azureservicebus #azure-servicebus-topics #azure-servicebus-subscriptions
#c# #azureservicebus #azure-servicebus-темы #azure-servicebus-подписки
Вопрос:
[редактировать: я переформулировал и упростил свой первоначальный вопрос]
Я использую тему / подписки на служебную шину Azure с сеансами, и у меня возникают проблемы с закрытием сеансов.
Для получения справочной информации мое приложение получает данные из сеанса подписки на тему (требование FIFO), который я поддерживаю большую часть времени. Только время от времени нам нужно на мгновение «приостановить» поток данных.
Когда запрашивается «пауза» в этом потоке данных, мы выходим из сеанса подписки и ждем запроса на повторное открытие сеанса.
// pseudo code
public class Test
{
public static async Task Me()
{
var client = new SubscriptionClient(
EndPoint,
Path,
Name,
TokenProvider,
TransportType.Amqp,
ReceiveMode.PeekLock,
new RetryExponential(
minimumBackoff: TimeSpan.FromSeconds(1),
maximumBackoff: TimeSpan.FromSeconds(30),
maximumRetryCount: 10));
// Setup consumer options
var sessionOptions = new SessionHandlerOptions(OnHandleExceptionReceived)
{
AutoComplete = false,
MessageWaitTimeout = TimeSpan.FromSeconds(10),
MaxConcurrentSessions = 1,
};
// Registration 1 - Start data flow
client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);
// Wait 1 - Artificially wait for 'data flow pause' to kick in.
// For the sake of this example, we artificially give plenty
// of time to the message session handler to receive something
// and close the session.
Task.Wait(TimeSpan.FromSeconds(30));
// Registration 2 - Artificially 'unpause' data flow
client.RegisterSessionHandler(OnMessageSessionAsync, sessionOptions);
// Wait 2 - Artificially wait for 'pause' to kick in again
Task.Wait(TimeSpan.FromSeconds(30));
// Finally close client
await client.CloseAsync();
}
private static async Task OnMessageSessionAsync(IMessageSession session, Message message, CancellationToken cancellationToken)
{
try
{
await client.CompleteAsync(message.SystemProperties.LockToken);
// Process message .. It doesn't matter what it is,
// just that at some point I want to break away from session
if (bool.TryParse(message.UserProperties["SessionCompleted"] as string, out bool completed) amp;amp; completed)
await session.CloseAsync(); // <-- This never works
}
catch (Exception e)
{
Console.WriteLine("OnMessageSessionAsync exception: {0}", e);
// Indicates a problem, unlock message in subscription.
await client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task OnHandleExceptionReceived(ExceptionReceivedEventArgs e)
{
var context = e.ExceptionReceivedContext;
Options.Logger?.LogWarning(e.Exception, new StringBuilder()
.AppendLine($"Message handler encountered an exception {e.Exception.GetType().Name}.")
.AppendLine("Exception context for troubleshooting:")
.AppendLine($" - Endpoint: {context.Endpoint}")
.AppendLine($" - Entity Path: {context.EntityPath}")
.Append($" - Executing Action: {context.Action}"));
return Task.CompletedTask;
}
}
Вопросы :
Как указывалось ранее, у меня возникла проблема с выходом из сеанса, поскольку вызов session.CloseAsync()
кажется неработоспособным. Сообщения продолжают появляться, хотя я явно попросил сеанс прекратить.
- Это нормальное поведение, когда сеанс темы не может быть закрыт напрямую? Если да, то зачем вообще раскрывать вызов
session.CloseAsync()
? - Могу ли я на самом деле закрыть сеанс независимо от подключения по подписке?
ps1: я основал свой код на официальном образце, доступном на github.com от Microsoft. И хотя этот пример основан на сеансе очереди, а не на сеансе темы, мне кажется логичным, что поведение должно быть идентичным.
ps2: я подробно описал, что может быть причиной в Microsoft.Azure. Репозиторий ServiceBus и мне интересно, отсутствует ли инициализация переменной под капотом MessageSession.OwnsConnection
свойства..