Как получить идентификатор сообщения служебной шины Azure при отправке сообщения в тему с помощью интеграции Spring

#spring-integration #azureservicebus #azure-spring-cloud

Вопрос:

После отправки сообщения в тему на служебной шине Azure с помощью Spring Integration я хотел бы получить идентификатор сообщения, который генерирует Azure. Я могу сделать это с помощью JMS. Есть ли способ сделать это с помощью Spring Integration? Код, с которым я работаю:

 @Service
public class ServiceBusDemo {
    private static final String OUTPUT_CHANNEL = "topic.output";
    private static final String TOPIC_NAME = "my_topic";

    @Autowired
    TopicOutboundGateway messagingGateway;

    public String send(String message) {
        // How can I get the Azure message id after sending here?
        this.messagingGateway.send(message);
        return message;
    }

    @Bean
    @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
    public MessageHandler topicMessageSender(ServiceBusTopicOperation topicOperation) {
        DefaultMessageHandler handler = new DefaultMessageHandler(TOPIC_NAME, topicOperation);
        handler.setSendCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(Void result) {
                System.out.println("Message was sent successfully to service bus.");
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("There was an error sending the message to service bus.");
            }
        });

        return handler;
    }

    @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
    public interface TopicOutboundGateway {
        void send(String text);
    }
}
 

Комментарии:

1. Как вы делаете это с помощью JMS? Что это такое DefaultMessageHandler ? Это из служебной шины Azure?

Ответ №1:

Вы можете использовать ChannelInterceptor для получения заголовков сообщений:

 public class CustomChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        //key of the message-id header is not stable, you should add logic here to check which header key should be used here.
        //ref: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/spring/azure-spring-cloud-starter-servicebus#support-for-service-bus-message-headers-and-properties
        String messageId = message.getHeaders().get("message-id-header-key").toString();

        return ChannelInterceptor.super.preSend(message, channel);
    }

}
 

Затем в конфигурации установите этот перехватчик на свой канал

     @Bean(name = OUTPUT_CHANNEL)
    public BroadcastCapableChannel pubSubChannel() {
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));
        return channel;
    }