Как обрабатывать неизвестные / недопустимые значения ключа маршрутизации привязки в RabbitMQ?

#java #spring-boot #rabbitmq #amqp #spring-cloud-stream

#java #пружинный ботинок #rabbitmq #amqp #spring-cloud-stream

Вопрос:

Я хотел бы знать, каков наилучший способ обработки сообщений с неизвестными / недопустимыми значениями ключа маршрутизации внутри exchange? В моем случае я отправляю все свои сообщения внутри одного и того же exchange, и на основе ключа маршрутизации сообщения направляются в соответствующую очередь. Вот моя конфигурация (я использую Spring Cloud Stream) :

 spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR
 

Теперь я спрашиваю, что произойдет, если я отправлю сообщение с payload.type='ANY' помощью ? Очевидно, что это сообщение не будет получено ни одним потребителем и останется внутри exchange, но каков наилучший способ отслеживать эти «неизвестные» сообщения? Могу ли я использовать DLQ для этого?

Спасибо!

Ответ №1:

Вы можете использовать mandatory свойство для своих издателей.

 When a published message cannot be routed to any queue, 
and the publisher set the mandatory message property to true, the 
message will be returned to it.
The publisher must have a returned message handler set up in order to 
handle the return (e.g. by logging an error or retrying with a different exchange)
 

Комментарии:

1. Похоже, настройка альтернативного обмена — это именно то, что я ищу. Не могли бы вы поделиться примером того, как это сделать, используя Spring Cloud Stream?

2. @Fred Я никогда раньше не работал с потоком Spring Cloud. Я представил основную концепцию. Я думаю, Гэри Рассел может привести вам пример.

Ответ №2:

останется внутри exchange,

Нет; обмены не «удерживают» сообщения, они просто маршрутизаторы.

Сообщения, не подлежащие маршрутизации, по умолчанию отбрасываются.

Вы можете настроить привязку для возврата сообщений, не подлежащих маршрутизации.

См. раздел Каналы ошибок.

Возвраты являются асинхронными.

В предстоящем выпуске 3.1 вы можете подождать в будущем, чтобы определить, было ли сообщение отправлено успешно или нет. См. Подтверждение издателя.

Если сообщение не маршрутизируется, returnedMessage устанавливается свойство корреляционных данных.

Фреймворк использует mandatory функцию, упомянутую в другом ответе.

Редактировать

Вот пример:

 spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
 
 @SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}
 

Комментарии:

1. Спасибо за разъяснения по обмену. В моем случае я бы хотел, чтобы сообщение автоматически перенаправлялось в альтернативную очередь (очередь «без маршрутизации»). Это то, что вы имели в виду, говоря «настроить привязку для возврата непрочитываемых сообщений»? Можно ли настроить это поведение, используя только Spring Cloud Stream? У вас случайно нет примера того, как этого добиться? Спасибо!

2. @Fred Для этого ничего не встроено, но это возможно с помощью нескольких строк конфигурации и небольшого количества Java. Я добавлю пример утром по времени восточного побережья США.

3. Хорошо, большое спасибо, я с нетерпением жду этого … этот пример был бы очень признателен.

4. Обратите внимание, что это errorChannel приводит к ошибкам во всех привязках; если у вас несколько выходных привязок и вы хотите отправлять непроходимые файлы в разные очереди, используйте @ServiceActivator(inputChannel = "my-exchange.errors") вместо этого.

5. Повторная публикация должна выполняться на стороне производителя (куда отправляется возвращенное сообщение); вы, конечно, можете настроить привязку потребителя в своем потребительском приложении для получения сообщений, не подлежащих маршрутизации.