#spring-kafka #spring-cloud-stream #producer-consumer #spring-integration-mqtt #spring-integration-jdbc
Вопрос:
Я использую SCDF для разработки потока, который позволяет сохранять все сообщения MQTT в базе данных SQL.
Это код, используемый для создания потока
stream create --name mqtt-to-jdbc --definition "mqtt --qos=2 --topics='#' --username=admin --password=******** --url='tcp://192.168.1.153:60065' | jdbc --username=sa --password=******** --driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver --url='jdbc:sqlserver://192.168.1.18;databaseName=test_db;schema=dbo' --table-name=mqtt_message --columns="headers:headers.toString(),payload:payload.toString(),created_at:new java.sql.Timestamp(T(System).currentTimeMillis()).toString()"" --deploy
Таблица mqtt_message содержит несколько столбцов, среди которых заголовки, полезная нагрузка, received_topic.
Поток успешно развернут, и данные сохраняются, однако:
Столбец заголовки извлекается с помощью команды SpEL headers.toString():
{b3=d4840635cb8c968c-381e88a613735a05-1, nativeHeaders={}, errorChannel=, id=e34b08d5-eafe-decd-4aa1-634cb187889a, timestamp=1622532820049}
Столбцы полезной нагрузки извлекаются с помощью полезной нагрузки SpEL.toString():
Test payload
Как вы можете видеть, значение из заголовков не включает предполагаемые заголовки, включая тему сообщения (mqtt_receivedTopic).
Если я предоставлю реализацию для производителя и приемника, я смогу получить доступ к следующим заголовкам сообщений:
Headers:{
mqtt_id=0,
deliveryAttempt=1,
kafka_timestampType=CREATE_TIME,
kafka_receivedTopic=reaper.reaper-source,
mqtt_receivedRetained=false,
kafka_offset=31,
mqtt_duplicate=false,
scst_nativeHeadersPresent=true,
kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19da7b97,
id=cc423e5f-af1e-173a-a9ac-c229ab544738,
kafka_receivedPartitionId=0,
mqtt_receivedTopic=test/topic,
contentType=application/json,
kafka_receivedTimestamp=1622453609998,
mqtt_receivedQos=0,
kafka_groupId=reaper,
timestamp=1622453610004
}
Я также протестировал следующие свойства, но ни одно из них не изменило результат:
Производитель
- spring.cloud.stream.привязки.вывод.производитель.headerMode=встроенные заголовки
- spring.cloud.stream.привязки.вывод.производитель.useNativeEncoding=true
Потребитель
- spring.cloud.stream.default.consumer.headerMode=встроенные заголовки
Есть ли какой-либо способ передать собственные заголовки между производителем и приемником и записать их в столбцы назначения (извлечение значения из полученной темы).
Спасибо.