Как получить доступ к исходным собственным заголовкам в потоке SCDF?

#spring-kafka #spring-cloud-stream #producer-consumer #spring-integration-mqtt #spring-integration-jdbc

Вопрос:

Я использую SCDF для разработки потока, который позволяет сохранять все сообщения MQTT в базе данных SQL.

Это код, используемый для создания потока

 stream create --name mqtt-to-jdbc  --definition "mqtt --qos=2 --topics='#' --username=admin --password=******** --url='tcp://192.168.1.153:60065' | jdbc --username=sa --password=******** --driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver --url='jdbc:sqlserver://192.168.1.18;databaseName=test_db;schema=dbo' --table-name=mqtt_message --columns="headers:headers.toString(),payload:payload.toString(),created_at:new java.sql.Timestamp(T(System).currentTimeMillis()).toString()"" --deploy
 

Таблица mqtt_message содержит несколько столбцов, среди которых заголовки, полезная нагрузка, received_topic.
Поток успешно развернут, и данные сохраняются, однако:
Столбец заголовки извлекается с помощью команды SpEL headers.toString():

 {b3=d4840635cb8c968c-381e88a613735a05-1, nativeHeaders={}, errorChannel=, id=e34b08d5-eafe-decd-4aa1-634cb187889a, timestamp=1622532820049}
 

Столбцы полезной нагрузки извлекаются с помощью полезной нагрузки SpEL.toString():

 Test payload
 

Как вы можете видеть, значение из заголовков не включает предполагаемые заголовки, включая тему сообщения (mqtt_receivedTopic).

Если я предоставлю реализацию для производителя и приемника, я смогу получить доступ к следующим заголовкам сообщений:

 Headers:{
    mqtt_id=0, 
    deliveryAttempt=1, 
    kafka_timestampType=CREATE_TIME, 
    kafka_receivedTopic=reaper.reaper-source, 
    mqtt_receivedRetained=false, 
    kafka_offset=31, 
    mqtt_duplicate=false, 
    scst_nativeHeadersPresent=true, 
    kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19da7b97, 
    id=cc423e5f-af1e-173a-a9ac-c229ab544738, 
    kafka_receivedPartitionId=0, 
    mqtt_receivedTopic=test/topic, 
    contentType=application/json, 
    kafka_receivedTimestamp=1622453609998, 
    mqtt_receivedQos=0, 
    kafka_groupId=reaper, 
    timestamp=1622453610004
}
 

Я также протестировал следующие свойства, но ни одно из них не изменило результат:

Производитель

  • spring.cloud.stream.привязки.вывод.производитель.headerMode=встроенные заголовки
  • spring.cloud.stream.привязки.вывод.производитель.useNativeEncoding=true

Потребитель

  • spring.cloud.stream.default.consumer.headerMode=встроенные заголовки

Есть ли какой-либо способ передать собственные заголовки между производителем и приемником и записать их в столбцы назначения (извлечение значения из полученной темы).

Спасибо.