повторная отправка сообщения в очередь RabbitMQ создает бесконечный цикл

#rabbitmq #spring-integration #spring-amqp

Вопрос:

У меня есть очередь RabbitMQ для хранения необработанных сообщений. Я счастливый путь, я прочитаю сообщение из очереди, обработаю его и удалю сообщение в очереди. Но если при обработке будут соблюдены определенные критерии, мне придется снова отправить сообщение в очередь. Я использую опрашиваемый адаптер канала для получения сообщения. поскольку я хочу получить все доступные сообщения в этой очереди во время опроса, я установил значение maxMessagesPerPoll равным -1. Это приводит к тому, что код переходит в бесконечный цикл. после повторной отправки сообщения в очередь входящий опрошенный адаптер немедленно принимает его. Как я могу предотвратить эту ситуацию?

Есть ли какой-либо способ задержать доставку сообщения или мы можем ограничить обработку сообщений один раз для каждого сообщения в одном опросе InboundPolledAdapter. Каков будет наилучший подход?

Встроенный адаптер-это,

 @Bean
public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
    return IntegrationFlows
            .from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
                    e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
            .handle(message -> {
                channelRequestFromQueue()
                        .send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
                                .setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
            }).get();
}
 

Для первой отправки первого сообщения в очередь по,

 @Bean
Binding bindingRetryQueue() {
    return BindingBuilder.bind(queueRetry()).to(exchangeRetry())
            .with(ProcessQueuedMessageService.RETRY_ROUTING_KEY);
}

@Bean
TopicExchange exchangeRetry() {
    return new TopicExchange(ProcessQueuedMessageService.RETRY_EXCHANGE);
}

@Bean
Queue queueRetry() {
    return new Queue(RetryQueue, false);
}

@Bean
@ServiceActivator(inputChannel = "channelAmqpOutbound")
public AmqpOutboundEndpoint outboundAmqp(AmqpTemplate amqpTemplate) {
    final AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey(RetryQueue);
    return outbound;
}
 

Повторная публикация сообщения с помощью,

 StaticMessageHeaderAccessor.getAcknowledgmentCallback(requeueMessage).acknowledge(Status.REQUEUE);
 

Ответ №1:

Есть ли какой-либо способ задержать доставку сообщения

Смотрите Функцию отложенного обмена в Rabbit MQ и ее API в весеннем AMQP: https://docs.spring.io/spring-amqp/docs/current/reference/html/#delayed-message-exchange

ограничьте обработку сообщений один раз для каждого сообщения

Для этого сценария вы можете изучить шаблон идемпотентного приемника и его реализацию в весенней интеграции: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#idempotent-receiver.

У повторно доставленного сообщения будет AmqpHeaders.REDELIVERED заголовок. Смотрите больше в документах: https://www.rabbitmq.com/reliability.html#consumer-side