#c# #azure #apache-kafka #apache-kafka-connect #azure-iot-hub
#c# #azure #apache-kafka #apache-kafka-connect #azure-iot-hub
Вопрос:
Я работаю над облачным решением Azure. Я использую IoT Hub, подключенный к Kafka, для обработки данных, поступающих с различных устройств IoT. Я сталкиваюсь с тем, что все данные, поступающие с нескольких устройств, хранятся в одной теме . Однако я хочу обрабатывать данные каждого устройства, подключенного к IoT Hub, в определенную тему в Kafka (у каждого устройства есть своя тема Kafka)
Toketi «Соединитель источника Kafka Connect для Azure IoT Hub» предоставляет следующий конфигурационный файл (пограничный узел)
connector.class=com.microsoft.azure.iot.kafka.connect.source.IotHubSourceConnector
name=AzureIotHubConnector
tasks.max=1
Kafka.Topic=IotTopic
IotHub.EventHubCompatibleName=iothub-toketi
IotHub.EventHubCompatibleEndpoint=sb://iothub-001.servicebus.windows.net/
IotHub.AccessKeyName=service
IotHub.AccessKeyValue=4KsdfiB9J899a N3iwerjKwzeqbZUj1K//KKj1ye9i3=
IotHub.ConsumerGroup=$Default
IotHub.Partitions=4
IotHub.StartTime=2016-11-28T00:00:00Z
IotHub.Offsets=
BatchSize=100
ReceiveTimeout=60
Для одной темы работает хранение всех данных с нескольких устройств, но я ожидаю сделать изоляцию между данными, поступающими с устройств
Любые решения или идеи!!
Спасибо
Ответ №1:
Одним из решений является использование SMT (преобразование одного сообщения).
Поток соединителя источника содержит несколько шагов:
- Данные опроса из внешнего источника как
List<SourceRecord>
- Преобразование каждого сообщения (
SourceRecord
) с использованием определенного SMT (может быть пропущено, если преобразование не определено - Преобразуйте ключ и значение
SourceRecord
в массивы байтов. - Отправить сообщение через
KafkaProducer
Kafka
Kafka Connect определяет, в какую тему отправлять сообщение на основе SourceRecord::topic
поля. Используя SMT, вы можете установить правильное значение темы.
Чистый Apache Kafka Connect не имеет такого преобразования. Если вы используете Confluent Platform, доступны некоторые дополнительные преобразования. Чтобы извлечь название темы, вы можете использовать ExtractTopic . У него есть свойство, которое называется field
Подробнее обо всей концепции SMT можно найти на веб-странице Apache Kafka или на веб-странице Confluent
Комментарии:
1. я не работаю в confluent plateform, чтобы использовать его параметры (kafka в Azure), и внедрение SMT в Kafka не отвечает моим потребностям. Можно найти какое-либо другое решение??
2. @benkhederrami, я думаю, вы можете скачать connect-transformations из Confluent и установить его в свой Kafka Connect ( confluent.io/connector/connect-transformations ). Это можно сделать так же, как и стандартный коннектор. Соединители, конвертеры и преобразования могут быть установлены в качестве плагина для Kafka Connect
3. Как я могу извлечь имя из своего сообщения и использовать его в качестве названия темы, а затем сохранить все сообщения, содержащие одно и то же имя. Я дал ему сообщение в формате json, но он указал, что он может структурировать поле только из сообщения struct.