#spring-integration #ibm-mq #spring-integration-dsl
#spring-интеграция #ibm-mq #spring-integration-dsl
Вопрос:
Я создал код на основе документации Java DSL для интеграции Spring. Когда я запускаю код, я продолжаю получать следующее предупреждение, хотя журналы также предполагают, что сообщение было успешно передано обоим подписчикам.
2021-03-04 17:21:25.589 WARN 46929 --- [enerContainer-1] bleJmsChannel$DispatchingMessageListener : Dispatcher has no subscribers for jms-channel 'application.jmsPublishSubscribeChannel'.
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:153) ~[spring-integration-core-5.4.3.jar:5.4.3]
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:229) ~[spring-integration-jms-5.4.4.jar:5.4.4]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:761) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:699) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) ~[spring-jms-5.3.3.jar:5.3.3]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) ~[spring-jms-5.3.3.jar:5.3.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-03-04 17:21:26.598 INFO 46929 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload={"Greeting":"Hello from Node at Thu Mar 04 2021 17:21:25 GMT 0000 (Greenwich Mean Time)"}, headers={id=50ab26ae-7a3d-8a2e-f694-94928b5097d6, timestamp=1614878485580}]
У меня есть @EnableIntegration
в моем приложении, и мой компонент pub / sub выглядит так:
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import java.util.concurrent.TimeUnit;
@Component
public class MessageFlowPub {
protected final Log logger = LogFactory.getLog(getClass());
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow() {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel(),
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))))
.log()
.handle(System.out::println)
;
}
@Bean
public IntegrationFlow msgHandler1() {
return IntegrationFlows.from("jmsPubSubBridgeChannel1")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow msgHandler2() {
return IntegrationFlows.from("jmsPubSubBridgeChannel2")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.log()
.handle(System.out::println)
.get();
}
}
Предупреждающее сообщение наводит меня на мысль, что я делаю что-то в корне неправильное, но я не вижу, что.
Комментарии:
1. Пожалуйста, покажите больше трассировки стека для этого предупреждения. Плюс покажите нам код, которому вы отправляете сообщения
jmsPublishSubscribeChannel
.2. @ArtemBilan Я добавил больше стека к этому вопросу. Сообщение поступает из Node.js приложение , опубликованное в теме MQ
dev/
. Вы можете увидеть это внизу трассировки.3. Попробуйте добавить
autoStartup(false)
к этомуjmsPublishSubscribeChannel
определению. Согласно вашим журналам, похоже, что он начинает получать сообщения из темы слишком рано: до добавления этих подписчиков. Однако следующие журналы подтверждают, что в конечном итоге это все равно произойдет…4. @ArtemBilan
autoStartup(false)
избавляется от предупреждения, но также отключает канал, чтобы ничего не поступало.5. Да… Я понимаю. Другой способ — удалить
@Bean
этоjmsPublishSubscribeChannel
и создать анализатор DSL, который позаботится о регистрации компонентов и так далее.
Ответ №1:
Хорошо, я нашел, в чем проблема.
Вы используете @Component
@Bean
комбинацию методов и. Более того, вы пытаетесь вызвать один метод bean из другого: .publishSubscribeChannel(jmsPublishSubscribeChannel()
. Это невозможно вне @Configuration
класса. Конфигурация аннотации только с @Component
помощью считается «легковесной», и поэтому мы не можем вызывать bean-методы друг от друга — они просто не проксируются для обеспечения правильного внедрения зависимостей через вызов метода.
Это должно сработать и для вас:
@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
Пожалуйста, прочитайте больше о облегченной конфигурации и proxyBeanMethods = false
:
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-java-basic-concepts
Ответ №2:
Благодаря @ArtemBilan , я избавился от предупреждения , удалив @Bean
для BroadcastCapableChannel
// @Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(connectionFactory)
.destination("dev/")
.get();
}
}
Комментарии:
1. Я не думаю, что это решает основную проблему. Я бы определенно перенес
jmsPublishSubscribeChannel
определение в DSL вместо@Bean
. Таким образом, Spring Integration позаботится о правильном порядке запуска компонентов. Вам просто повезло, что вашAppComponents
обработан послеMessageFlowPub
, поэтому компоненты на том же этапе запускаются в ожидаемом порядке. Это может сломаться в другой среде или при добавлении дополнительных компонентов в ваш проект. Это определенно может быть проблемой на уровне фреймворка, но одновременно ее может быть сложно решить…2. @ArtemBilan Так что-то похожее
IntegrationFlows.from(jmsPublishSubscribeChannel)
?3. Что-то вроде просто удалить
@Bean
изjmsPublishSubscribeChannel()
метода.4. @ArtemBilan это тоже работает. Я изменю ответ.