Получение «У диспетчера нет подписчиков для jms-канала» с использованием Jms Pub / Sub с интеграцией Spring с использованием DSL

#spring-integration #ibm-mq #spring-integration-dsl

#spring-интеграция #ibm-mq #spring-integration-dsl

Вопрос:

Я создал код на основе документации Java DSL для интеграции Spring. Когда я запускаю код, я продолжаю получать следующее предупреждение, хотя журналы также предполагают, что сообщение было успешно передано обоим подписчикам.

 
2021-03-04 17:21:25.589  WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
    at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

2021-03-04 17:21:26.598  INFO 46929 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT 0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]


 

У меня есть @EnableIntegration в моем приложении, и мой компонент pub / sub выглядит так:

 import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;

@Component
public class MessageFlowPub {

    protected final Log logger = LogFactory.getLog(getClass());

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public BroadcastCapableChannel jmsPublishSubscribeChannel() {
        return Jms.publishSubscribeChannel(connectionFactory)
                .destination("dev/")
                .get();
    }

    @Bean
    public IntegrationFlow pubSubFlow() {
        return f -> f
                .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                        pubsub -> pubsub
                                .subscribe(subFlow -> subFlow
                                        .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                                .subscribe(subFlow -> subFlow
                                        .channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
                .log()
                .handle(System.out::println)
                ;
    }

    @Bean
    public IntegrationFlow msgHandler1() {
        return IntegrationFlows.from("jmsPubSubBridgeChannel1")
                .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
                .log()
                .handle(System.out::println)
                .get();
    }

    @Bean
    public IntegrationFlow msgHandler2() {
        return IntegrationFlows.from("jmsPubSubBridgeChannel2")
                .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
                .log()
                .handle(System.out::println)
                .get();
    }
    
}

 

Предупреждающее сообщение наводит меня на мысль, что я делаю что-то в корне неправильное, но я не вижу, что.

Комментарии:

1. Пожалуйста, покажите больше трассировки стека для этого предупреждения. Плюс покажите нам код, которому вы отправляете сообщения jmsPublishSubscribeChannel .

2. @ArtemBilan Я добавил больше стека к этому вопросу. Сообщение поступает из Node.js приложение , опубликованное в теме MQ dev/ . Вы можете увидеть это внизу трассировки.

3. Попробуйте добавить autoStartup(false) к этому jmsPublishSubscribeChannel определению. Согласно вашим журналам, похоже, что он начинает получать сообщения из темы слишком рано: до добавления этих подписчиков. Однако следующие журналы подтверждают, что в конечном итоге это все равно произойдет…

4. @ArtemBilan autoStartup(false) избавляется от предупреждения, но также отключает канал, чтобы ничего не поступало.

5. Да… Я понимаю. Другой способ — удалить @Bean это jmsPublishSubscribeChannel и создать анализатор DSL, который позаботится о регистрации компонентов и так далее.

Ответ №1:

Хорошо, я нашел, в чем проблема.

Вы используете @Component @Bean комбинацию методов и. Более того, вы пытаетесь вызвать один метод bean из другого: .publishSubscribeChannel(jmsPublishSubscribeChannel() . Это невозможно вне @Configuration класса. Конфигурация аннотации только с @Component помощью считается «легковесной», и поэтому мы не можем вызывать bean-методы друг от друга — они просто не проксируются для обеспечения правильного внедрения зависимостей через вызов метода.

Это должно сработать и для вас:

 @Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(connectionFactory)
            .destination("dev/")
            .get();
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
 

Пожалуйста, прочитайте больше о облегченной конфигурации и proxyBeanMethods = false :
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts

Ответ №2:

Благодаря @ArtemBilan , я избавился от предупреждения , удалив @Bean для BroadcastCapableChannel

 
    // @Bean
    public BroadcastCapableChannel jmsPublishSubscribeChannel() {
        return Jms.publishSubscribeChannel(connectionFactory)
                .destination("dev/")
                .get();
    }
}

 

Комментарии:

1. Я не думаю, что это решает основную проблему. Я бы определенно перенес jmsPublishSubscribeChannel определение в DSL вместо @Bean . Таким образом, Spring Integration позаботится о правильном порядке запуска компонентов. Вам просто повезло, что ваш AppComponents обработан после MessageFlowPub , поэтому компоненты на том же этапе запускаются в ожидаемом порядке. Это может сломаться в другой среде или при добавлении дополнительных компонентов в ваш проект. Это определенно может быть проблемой на уровне фреймворка, но одновременно ее может быть сложно решить…

2. @ArtemBilan Так что-то похожее IntegrationFlows.from(jmsPublishSubscribeChannel) ?

3. Что-то вроде просто удалить @Bean из jmsPublishSubscribeChannel() метода.

4. @ArtemBilan это тоже работает. Я изменю ответ.