#session #servicebus #azure-sdk-.net #azure-servicebus-topics #azure-servicebus-subscriptions
#сеанс #служебная шина #azure-sdk-.net #azure-servicebus-темы #azure-servicebus-подписки
Вопрос:
- Я использую .net Core 3.1 и Microsoft.Лазурный.Служебная шина (версия 5.1.3).
- У меня есть раздел служебной шины с одной подпиской, который может обрабатывать только сообщения сеанса.
Клиент темы может отправить 3 сообщения с идентификатором сеанса (скажем, ABCD), а затем отправляет еще 4 сообщения с идентификатором сеанса (XYZ). Довольно легко написать клиент подписки, чтобы получать все 7 сообщений с соответствующими идентификаторами сеанса. Однако я хочу иметь возможность получать сообщения только с идентификатором сеанса XYZ (и не забочусь о сообщениях с идентификатором сеанса ABCD и даже не хочу их получать).
Следующий пример кода для получения всех сообщений со всеми идентификаторами сеанса работает так, как ожидалось:
static async Task Main(string[] args)
{
try
{
byte[] messageBody = System.Text.Encoding.Unicode.GetBytes("Hello, world!");
ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);
SubscriptionClient client = new SubscriptionClient(builder, subscriptionName, ReceiveMode.PeekLock);
var sessionHandler = new SessionHandlerOptions(ExceptionHandler);
sessionHandler.AutoComplete = true;
client.RegisterSessionHandler(SessionMessageHandler, sessionHandler);
Console.WriteLine("Press any key to exit!");
Console.ReadKey();
await client.CloseAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
static Task SessionMessageHandler(IMessageSession session, Message message, CancellationToken cancellationToken)
{
var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
return Task.CompletedTask;
}
static Task ExceptionHandler(ExceptionReceivedEventArgs args)
{
var context = args.ExceptionReceivedContext;
Console.WriteLine($"Exception context: {context.Action}, {context.ClientId}, {context.Endpoint}, {context.EntityPath}");
Console.WriteLine($"Exception: {args.Exception}");
return Task.CompletedTask;
}
Вопросы:
- Как мне изменить приведенный выше код, чтобы я получал сообщения только с идентификатором сеанса XYZ (без получения сообщений с идентификатором сеанса ABCD)?
- Если это невозможно с помощью приведенного выше кода, есть ли какой-либо другой способ достичь того, чего я хочу (с той же библиотекой)? Если да, пожалуйста, приведите примеры.
Ответ №1:
Приведенный выше код использует обработчик сеанса, который предназначен для обработки нескольких сеансов, а не только одного сеанса. Если вы хотите обработать один сеанс только с определенным идентификатором, вам нужно будет использовать SessionClient
и его AcceptMessageSessionAsync(String)
метод, который принимает идентификатор сеанса в качестве параметра.
Ответ №2:
Используя совет Шона, я изменил код на следующий, и он работает. Спасибо, Шон.
static async Task Main(string[] args)
{
try
{
Console.WriteLine("Enter session id to listen on ...");
var sessionId = Console.ReadLine();
if (sessionId == string.Empty)
{
sessionId = "12345";
}
Console.WriteLine($"Reading messages with session id: {sessionId}");
var sessionClient = new SessionClient(connectionStringWithoutEntityPath, subscriberPath, ReceiveMode.PeekLock);
var messageSession = await sessionClient.AcceptMessageSessionAsync(sessionId);
if (messageSession != null)
{
while(true)
{
Message message = await messageSession.ReceiveAsync();
if (message != null)
{
var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
await messageSession.CompleteAsync(message.SystemProperties.LockToken);
}
else
{
Console.WriteLine("Press Enter to keep reading. Otherwise press any other key to exit.");
if (Console.ReadLine() != string.Empty)
{
break;
}
}
}
}
await sessionClient.CloseAsync();
Console.WriteLine("Press any key to exit!");
Console.ReadKey();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}