#rabbitmq #spring-integration #spring-amqp
Вопрос:
У меня есть очередь RabbitMQ для хранения необработанных сообщений. Я счастливый путь, я прочитаю сообщение из очереди, обработаю его и удалю сообщение в очереди. Но если при обработке будут соблюдены определенные критерии, мне придется снова отправить сообщение в очередь. Я использую опрашиваемый адаптер канала для получения сообщения. поскольку я хочу получить все доступные сообщения в этой очереди во время опроса, я установил значение maxMessagesPerPoll равным -1. Это приводит к тому, что код переходит в бесконечный цикл. после повторной отправки сообщения в очередь входящий опрошенный адаптер немедленно принимает его. Как я могу предотвратить эту ситуацию?
Есть ли какой-либо способ задержать доставку сообщения или мы можем ограничить обработку сообщений один раз для каждого сообщения в одном опросе InboundPolledAdapter. Каков будет наилучший подход?
Встроенный адаптер-это,
@Bean
public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
return IntegrationFlows
.from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
.handle(message -> {
channelRequestFromQueue()
.send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
.setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
}).get();
}
Для первой отправки первого сообщения в очередь по,
@Bean
Binding bindingRetryQueue() {
return BindingBuilder.bind(queueRetry()).to(exchangeRetry())
.with(ProcessQueuedMessageService.RETRY_ROUTING_KEY);
}
@Bean
TopicExchange exchangeRetry() {
return new TopicExchange(ProcessQueuedMessageService.RETRY_EXCHANGE);
}
@Bean
Queue queueRetry() {
return new Queue(RetryQueue, false);
}
@Bean
@ServiceActivator(inputChannel = "channelAmqpOutbound")
public AmqpOutboundEndpoint outboundAmqp(AmqpTemplate amqpTemplate) {
final AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey(RetryQueue);
return outbound;
}
Повторная публикация сообщения с помощью,
StaticMessageHeaderAccessor.getAcknowledgmentCallback(requeueMessage).acknowledge(Status.REQUEUE);
Ответ №1:
Есть ли какой-либо способ задержать доставку сообщения
Смотрите Функцию отложенного обмена в Rabbit MQ и ее API в весеннем AMQP: https://docs.spring.io/spring-amqp/docs/current/reference/html/#delayed-message-exchange
ограничьте обработку сообщений один раз для каждого сообщения
Для этого сценария вы можете изучить шаблон идемпотентного приемника и его реализацию в весенней интеграции: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#idempotent-receiver.
У повторно доставленного сообщения будет AmqpHeaders.REDELIVERED
заголовок. Смотрите больше в документах: https://www.rabbitmq.com/reliability.html#consumer-side