Интеграция Spring Mqtt: проблема с исходящей темой

#spring-integration #mqtt #mosquitto

#spring-интеграция #mqtt #mosquitto

Вопрос:

Привет, я пытаюсь использовать интеграцию spring для получения сообщений MQTT, их обработки и публикации в другой теме.

Вот integration.xml:

    <int-mqtt:outbound-channel-adapter id="mqtt-publish"
    client-id="spring-foo-1"
    client-factory="clientFactory"
    auto-startup="true"
    url="tcp://localhost:1883"
    default-qos="0"
    default-retained="true"
    default-topic="tweets/akki" />

   <int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="spring-foo-2"
    client-factory="clientFactory"
    auto-startup="true"
    url="tcp://localhost:1883"
    topics="mqtt/publish"
    />

    <int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator>

    <bean id="mqttLogger" class="hello.mqttReceiver" />
 

И mqttReceiver.java:

 package hello;
public class mqttReceiver {
   public String logMessages(String a){
       String processed_data = a; //TODO Process Data
       return processed_data;
   }
}
 

Ниже приведены проблемы, с которыми я сталкиваюсь:

  • processed_data Перенаправляется в mqtt / publish, а не в mqtt / akki
  • processed_data Не публикуются, но много раз

Ответ №1:

Это правильно, потому AbstractMqttMessageHandler что в первую очередь рассматривается headers :

 String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
    Object mqttMessage = this.converter.fromMessage(message, Object.class);
    if (topic == null amp;amp; this.defaultTopic == null) {
        throw new MessageHandlingException(message,
                "No '"   MqttHeaders.TOPIC   "' header and no default topic defined");
    }
 

Когда DefaultPahoMessageConverter заполняется этот MqttHeaders.TOPIC заголовок MqttPahoMessageDrivenChannelAdapter при поступлении сообщения.

Вы должны рассмотреть возможность использования <int:header-filter header-names="mqtt_topic"/> перед отправкой сообщения на <int-mqtt:outbound-channel-adapter>

Комментарии:

1. Вы также можете использовать расширитель заголовка для замены заголовка темы (установите значение перезаписи true).