Как настроить «сквозное» подтверждение издателя с помощью Spring AMQP

#rabbitmq #spring-amqp

#rabbitmq #spring-amqp

Вопрос:

Я хочу использовать publisher confirms с помощью RabbitMQ и Spring AMQP таким образом, чтобы обратный вызов подтверждения сообщения получал NACK, если слушатель выдает исключение во время обработки сообщения.

После этого сообщения в блоге я говорю о варианте использования, отмеченном красным цветом:

введите описание изображения здесь

Главный вопрос заключается в следующем:

  1. как мне настроить ConnectionFactory, RabbitTemplate и ListenerContainer для включения ручного NACKs?
  2. что мне нужно сделать в моем прослушивателе, чтобы ПЕРЕХВАТИТЬ сообщение и вызвать обратный вызов подтверждения success = false в случае исключения?

Вот мои бобы:

 @Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
}

@Bean
public ConfirmCallback confirmCallback() {
    return new ConfirmCallbackTestImplementation();
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ConfirmCallback confirmCallback) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setExchange(DIRECT_EXCHANGE);
    return rabbitTemplate;
}

@Bean
public FaultyMessageListener faultyListener(RabbitAdmin rabbitAdmin, DirectExchange exchange, ConnectionFactory connectionFactory) {
    Queue queue = queue(rabbitAdmin, exchange, "faultyListener");
    FaultyMessageListener listener = new FaultyMessageListener();
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setMessageListener(listener);
    container.setQueues(queue);
    container.setDefaultRequeueRejected(false);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    container.start();
    return listener;
}

private Queue queue(RabbitAdmin rabbitAdmin, DirectExchange exchange, String routingKey) {
    Queue queue = new Queue(routingKey, true, false, true);
    rabbitAdmin.declareQueue(queue);
    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(routingKey));
    return queue;
}
 

Вот моя реализация Слушателя:

 public class FaultyMessageListener implements ChannelAwareMessageListener {

    private final List<Message> receivedMessages = new ArrayList<>();

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        receivedMessages.add(message);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        latch.countDown();
        throw new AmqpException("Message could not be processed");
    }

}
 

Вот мой обратный вызов подтверждения:

 public static class ConfirmCallbackTestImplementation implements ConfirmCallback {

    private volatile Map<String, Boolean> confirmations = new HashMap<>();
    private volatile HashMap<String, CountDownLatch> expectationLatches = new HashMap<>();

    @Override
    public void confirm(CorrelationData correlationData, boolean success, String s) {
        confirmations.put(correlationData.getId(), success);
        expectationLatches.get(correlationData.getId()).countDown();
    }

    public CountDownLatch expect(String correlationId) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectationLatches.put(correlationId, latch);
        return latch;
    }

}
 

Затем я использую следующий тест, чтобы проверить желаемое поведение:

 @Autowired
private RabbitTemplate template;

@Autowired
private FaultyMessageListener faultyListener;

@Autowired
private ConfirmCallbackTestImplementation testConfirmCallback;

@Test
public void sendMessageToFaultyMessageListenerResultsInNack() throws InterruptedException {
    String correlationId = "corr-data-test-2";
    CountDownLatch confirmationLatch = testConfirmCallback.expect(correlationId);

    template.convertAndSend("ConnectionsTests.PublisherConfirm", "faultyListener", "faulty message", new CorrelationData(correlationId));

    assertTrue(faultyListener.latch.await(1, TimeUnit.SECONDS));
    confirmationLatch.await(1, TimeUnit.SECONDS);

    assertThat(faultyListener.receivedMessages.size(), is(1));
    assertThat(testConfirmCallback.confirmations.get(correlationId), is(false));
}
 

Результатом теста является:

 java.lang.AssertionError: 
    Expected: is <false>
         but: was <true>
 

для последнего утверждения. Для меня это звучит так, как будто обратный вызов подтверждения всегда вызывается с success = true помощью вместо success = false того, что я ожидал бы от channel.basicNack(...) in my listener .

Ответ №1:

Это так не работает; ack / nack на стороне издателя зависит исключительно от того, принял ли брокер сообщение. Фактически, nack редко возвращается, поскольку это означает проблему в самом брокере — см. Документацию rabbit .

basic.nack будет доставлен только в том случае, если в процессе Erlang, ответственном за очередь, возникает внутренняя ошибка.

Аналогично, ack / nack на стороне потребителя зависит исключительно от того, принял ли потребитель ответственность за сообщение, и nack позволяет запрашивать сообщение, отбрасывать его или перенаправлять в очередь мертвых писем.

После публикации сообщения обратная связь с издателем от потребителя (потребителей) отсутствует. Если вам нужна такая связь, вам необходимо настроить очереди ответов.

Вместо этого вы можете использовать Spring Remoting (RPC) поверх RabbitMQ, если хотите тесной связи между издателем и потребителем. Если потребитель выдает исключение, оно будет передано обратно издателю — однако этот механизм поддерживает только Serializable объекты Java.

Хотя документация ссылается на XML, вы можете подключить прокси-сервер и вызывающий сервис как @Bean s