#java #spring-boot #rabbitmq #amqp #spring-cloud-stream
#java #пружинный ботинок #rabbitmq #amqp #spring-cloud-stream
Вопрос:
Я хотел бы знать, каков наилучший способ обработки сообщений с неизвестными / недопустимыми значениями ключа маршрутизации внутри exchange? В моем случае я отправляю все свои сообщения внутри одного и того же exchange, и на основе ключа маршрутизации сообщения направляются в соответствующую очередь. Вот моя конфигурация (я использую Spring Cloud Stream) :
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type
spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2
spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
Теперь я спрашиваю, что произойдет, если я отправлю сообщение с payload.type='ANY'
помощью ? Очевидно, что это сообщение не будет получено ни одним потребителем и останется внутри exchange, но каков наилучший способ отслеживать эти «неизвестные» сообщения? Могу ли я использовать DLQ для этого?
Спасибо!
Ответ №1:
Вы можете использовать mandatory
свойство для своих издателей.
When a published message cannot be routed to any queue,
and the publisher set the mandatory message property to true, the
message will be returned to it.
The publisher must have a returned message handler set up in order to
handle the return (e.g. by logging an error or retrying with a different exchange)
Комментарии:
1. Похоже, настройка альтернативного обмена — это именно то, что я ищу. Не могли бы вы поделиться примером того, как это сделать, используя Spring Cloud Stream?
2. @Fred Я никогда раньше не работал с потоком Spring Cloud. Я представил основную концепцию. Я думаю, Гэри Рассел может привести вам пример.
Ответ №2:
останется внутри exchange,
Нет; обмены не «удерживают» сообщения, они просто маршрутизаторы.
Сообщения, не подлежащие маршрутизации, по умолчанию отбрасываются.
Вы можете настроить привязку для возврата сообщений, не подлежащих маршрутизации.
См. раздел Каналы ошибок.
Возвраты являются асинхронными.
В предстоящем выпуске 3.1 вы можете подождать в будущем, чтобы определить, было ли сообщение отправлено успешно или нет. См. Подтверждение издателя.
Если сообщение не маршрутизируется, returnedMessage
устанавливается свойство корреляционных данных.
Фреймворк использует mandatory
функцию, упомянутую в другом ответе.
Редактировать
Вот пример:
spring.rabbitmq.publisher-returns: true
spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']
spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {
public static void main(String[] args) {
SpringApplication.run(So65134452Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
output.send(MessageBuilder.withPayload("foo")
.setHeader("rk", "invalid")
.build());
};
}
@Autowired
RabbitTemplate template;
@Bean
public Queue unroutable() {
return new Queue("unroutable.messages");
}
@ServiceActivator(inputChannel = "errorChannel")
public void error(Message<?> error) {
if (error.getPayload() instanceof ReturnedAmqpMessageException) {
this.template.send(unroutable().getName(),
((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
}
}
}
Комментарии:
1. Спасибо за разъяснения по обмену. В моем случае я бы хотел, чтобы сообщение автоматически перенаправлялось в альтернативную очередь (очередь «без маршрутизации»). Это то, что вы имели в виду, говоря «настроить привязку для возврата непрочитываемых сообщений»? Можно ли настроить это поведение, используя только Spring Cloud Stream? У вас случайно нет примера того, как этого добиться? Спасибо!
2. @Fred Для этого ничего не встроено, но это возможно с помощью нескольких строк конфигурации и небольшого количества Java. Я добавлю пример утром по времени восточного побережья США.
3. Хорошо, большое спасибо, я с нетерпением жду этого … этот пример был бы очень признателен.
4. Обратите внимание, что это
errorChannel
приводит к ошибкам во всех привязках; если у вас несколько выходных привязок и вы хотите отправлять непроходимые файлы в разные очереди, используйте@ServiceActivator(inputChannel = "my-exchange.errors")
вместо этого.5. Повторная публикация должна выполняться на стороне производителя (куда отправляется возвращенное сообщение); вы, конечно, можете настроить привязку потребителя в своем потребительском приложении для получения сообщений, не подлежащих маршрутизации.