#spring-integration #mqtt #mosquitto
#spring-интеграция #mqtt #mosquitto
Вопрос:
Привет, я пытаюсь использовать интеграцию spring для получения сообщений MQTT, их обработки и публикации в другой теме.
Вот integration.xml:
<int-mqtt:outbound-channel-adapter id="mqtt-publish"
client-id="spring-foo-1"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
default-qos="0"
default-retained="true"
default-topic="tweets/akki" />
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="spring-foo-2"
client-factory="clientFactory"
auto-startup="true"
url="tcp://localhost:1883"
topics="mqtt/publish"
/>
<int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator>
<bean id="mqttLogger" class="hello.mqttReceiver" />
И mqttReceiver.java:
package hello;
public class mqttReceiver {
public String logMessages(String a){
String processed_data = a; //TODO Process Data
return processed_data;
}
}
Ниже приведены проблемы, с которыми я сталкиваюсь:
processed_data
Перенаправляется в mqtt / publish, а не в mqtt / akkiprocessed_data
Не публикуются, но много раз
Ответ №1:
Это правильно, потому AbstractMqttMessageHandler
что в первую очередь рассматривается headers
:
String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
Object mqttMessage = this.converter.fromMessage(message, Object.class);
if (topic == null amp;amp; this.defaultTopic == null) {
throw new MessageHandlingException(message,
"No '" MqttHeaders.TOPIC "' header and no default topic defined");
}
Когда DefaultPahoMessageConverter
заполняется этот MqttHeaders.TOPIC
заголовок MqttPahoMessageDrivenChannelAdapter
при поступлении сообщения.
Вы должны рассмотреть возможность использования <int:header-filter header-names="mqtt_topic"/>
перед отправкой сообщения на <int-mqtt:outbound-channel-adapter>
Комментарии:
1. Вы также можете использовать расширитель заголовка для замены заголовка темы (установите значение перезаписи true).