Как извлекать сообщения из очереди безответных писем RabbitMQ с помощью MassTransit?

#c# #rabbitmq #masstransit

#c# #rabbitmq #masstransit

Вопрос:

Я пытаюсь уведомить пользователя, когда сообщение не получено через некоторое время, используя MassTransit и RabbitMQ.

Из того, что я прочитал, время ожидания устанавливается с помощью свойства TimeToLive при публикации сообщения. По истечении указанного времени сообщение должно быть автоматически добавлено в очередь нерассмотренных писем с именем «_skipped» в конце.

Как мне извлекать сообщения из очередей мертвых писем? В моей попытке, приведенной ниже, сообщение добавляется в обе очереди сразу, и время его ожидания никогда не истекает.

Я думаю, что мог бы сделать это с помощью sagas, но это кажется слишком сложным решением для такой простой проблемы, поэтому я хотел бы избежать его использования, если это вообще возможно.

 static void Main(string[] args)
{
    var bus = CreateBus("rabbitmq://localhost/", "guest", "guest", true);

    var msg = new TestMessage("First Message");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();

    bus = CreateBus("rabbitmq://localhost/", "guest", "guest", false);

    msg = new TestMessage("SecondMessage");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();
}

private static IBusControl CreateBus(string rabbitUrl, string username, string password, bool enableEndpoint)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(c =>
    {
        var host = c.Host(new Uri(rabbitUrl), h =>
        {
            h.Username(username);
            h.Password(password);
        });

        if (enableEndpoint)
        {
            c.ReceiveEndpoint(host, "TestQueue", x =>
            {
                x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
            });
        }

        c.ReceiveEndpoint(host, "TestQueue_skipped", x =>
        {
            x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_skipped"));
        });
    });

    bus.Start();

    return bus;
}

private static void LogMessageSent(TestMessage msg)
{
    Console.WriteLine(string.Format("{0} - Message "{1}" sent.", DateTime.Now.ToString("HH:mm:ss"), msg.Content));
}

private static Task LogMessageReceived(TestMessage msg, string queueName)
{
    Console.WriteLine(string.Format("{0} - Message "{1}" received on queue "{2}".", DateTime.Now.ToString("HH:mm:ss"), msg.Content, queueName));
    return Task.CompletedTask;
}

public class TestMessage
{
    public string Content { get; }

    public TestMessage(string content)
    {
        Content = content;
    }
}
  

Ответ №1:

Поскольку вы вызываете Publish , сообщение отправляется каждому подписчику. Поскольку каждая конечная точка приема добавляет потребителя, это создает подписку (и последующую привязку exchange в RabbitMQ) для этого типа сообщения. Вы можете отключить это, указав BindMessageExchanges = false на этой пропущенной конечной точке приема. Вам нужно будет вручную удалить привязку exchange к брокеру.

Что касается вашего вопроса TimeToLive, это не так, как это работает. Время ожидания передается брокеру, и если срок действия сообщения истекает, оно перемещается в указанную брокером очередь просроченных писем, если так настроено. Оно не перемещается в пропущенную очередь, которая имеет другое значение в MassTransit. В MassTransit пропущенная очередь предназначена для сообщений, которые доставляются на конечную точку приема, но на этой конечной точке не был настроен потребитель для получения сообщения.

Для RabbitMQ вы можете настроить очередь просроченных писем в MassTransit с помощью:

 endpoint.BindDeadLetterQueue("dead-letter-queue-name");
  

Это настроит брокер таким образом, что сообщения, достигшие их TTL, будут перемещены в указанный обмен / очередь. Тогда ваш потребитель на этой конечной точке приема сможет их использовать (опять же, не забудьте установить BindMessageExchanges = false на конечной точке получения deadletter.

Например:

 c.ReceiveEndpoint(host, "TestQueue_expired", x =>
{
    x.BindMessageExchanges = false;
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_expired"));
});
  

А затем ваша исходная конечная точка приема:

 c.ReceiveEndpoint(host, "TestQueue", x =>
{
    x.BindDeadLetterQueue("TestQueue_expired");
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});
  

Комментарии:

1. Я запустил пример с предложенными вами изменениями кода, и сообщение было добавлено в очередь «_expired» по таймауту, но оно также было добавлено в ту же очередь сразу, как если бы конфигурация «BindMessageExchanges = false» была проигнорирована. Есть ли какая-либо дополнительная конфигурация, которую мне нужно добавить?

2. Как я уже сказал, вам нужно зайти в консоль управления RabbitMQ и удалить привязку, которая уже была создана из-за вашей предыдущей конфигурации.

3. Я понимаю. Я думал, что удаление очереди также приведет к удалению exchange, но это было не так. Когда я удалил exchange вручную, это сработало. Спасибо!