Как прослушивать ответное сообщение rpc rabbitmq

#spring-boot #rabbitmq #spring-amqp #spring-rabbit #rabbitmq-exchange

Вопрос:

MessageListenerContainer с MessageListenerAdapter реализованным как

 void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
        var correlationData = new CorrelationData(messageProperties.getCorrelationId());
        MessagePostProcessor messagePostProcessor = (m) -> {
            m.getMessageProperties().setCorrelationId(correlationData.getId());
            if (messagePostProcessorFactory != null) {
                messagePostProcessorFactory.create(exchange, routingKey).postProcessMessage(m);
            }
            return m;
        };
        MyRpcResponse response = computeResponse(request);
    rabbitTemplate.convertAndSend("", messageProperties.getReplyTo(), response, messagePostProcessor, correlationData);
}
 

Я отправляю ответ прямо на обмен по умолчанию — и он отлично работает — получен ответ rpc (проще просто вернуть ответ, не отправляя его обратно с шаблоном кролика, но я специально отправляю его вручную). Но таким образом я не смогу прослушивать ответные сообщения. Что я хотел бы сделать, так это сделать, как показано ниже — отправка в RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE, который является разветвленным обменом — я могу прослушивать сообщения с него, а затем я хотел бы, чтобы он перенаправлял сообщения на обмен по умолчанию — Я пытаюсь привязать свой обмен ответами к обмену по умолчанию, но это не работает

 void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
    ....
    rabbitTemplate.convertAndSend("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE",
        messageProperties.getReplyTo(), response, messagePostProcessor, correlationData); 
    ....
}

    @Bean
    FanoutExchange defaultExchange() {
        return new FanoutExchange("");
    }

    @Bean
    FanoutExchange myRpcResponseExchange() {
        return new FanoutExchange("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE");
    }

    @Bean
    Binding bindMyRpcResponseExchangeToDefaultExchange(FanoutExchange myRpcResponseExchange, FanoutExchange defaultExchange) {
        return BindingBuilder.bind(myRpcResponseExchange).to(defaultExchange);
    }
 

Как я могу исправить свой код, чтобы я мог прослушивать ответные сообщения?

Как я могу прослушивать ответные сообщения на запросы, которые отправляются как:

rabbitTemplate.convertSendAndReceiveAsType(exchange, routingKey, message, messagePostProcessor, correlationData, responseType); ?

Комментарии:

1. Я вижу проблему — обмен по умолчанию-это DirectExchange, а не FanoutExchange. Он должен быть связан с ключом маршрутизации. Когда я пытаюсь привязать myResponseExchange к обмену по умолчанию с пустым ключом маршрутизации, это не работает.

2. изменение направления привязки, то есть: возврат BindingBuilder.bind(defaultExchange).to(twiceRpcResponseExchange); также не помогает

Ответ №1:

Вам не разрешается вручную привязывать что-либо к обмену по умолчанию.

Смотрите пользовательский интерфейс управления — там нет опции «привязать».

Вы также не можете привязать обмен по умолчанию к другому.

Вам нужно опубликовать в разветвителе и привязать к нему очередь ответов.