#apache-kafka-connect #confluent-platform
#apache-kafka-connect #confluent-платформа
Вопрос:
Я пытаюсь использовать соединитель приемника Confluent InfluxDB для получения данных из темы в мою InfluxDB. Конфигурация выглядит следующим образом:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1
Все, что я получаю при создании нового соединителя через пользовательский интерфейс Kafka Connect, — это следующее исключение:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
Значения в разделе представляют собой строки json, подобные этому: {"pid":1,"filename":"test1.csv"}
. Есть ли какая-либо конфигурация, которой мне здесь не хватает?
Обновление: Вот моя рабочая конфигурация:
config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
Комментарии:
1. Можете ли вы также предоставить общий доступ к своему рабочему
properties
файлу
Ответ №1:
Соединителю InfluxDB требуется, чтобы в данных присутствовала схема, поэтому, если у вас есть данные JSON, вам необходимо установить
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
НО ваш JSON должен включать схему, поэтому вместо
{"pid":1,"filename":"test1.csv"}
вам понадобится что-то вроде
{
"schema": {
"type": "struct", "optional": false, "version": 1, "fields": [
{ "field": "pid", "type": "string", "optional": true },
{ "field": "filename", "type": "string", "optional": true }
]
},
"payload": {
"pid": 1,
"filename": "test1.csv"
}
}
Ссылка:https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect /
Более подробную информацию о том, как применить схему к вашим данным, смотрите в этом блоге
Более подробную информацию о конвертерах в целом смотрите в этой статье.
Комментарии:
1. Спасибо, не знал, что вы можете настраивать конвертеры для каждого соединителя. Я создал новую конфигурацию с предоставленными вами свойствами. В пользовательском интерфейсе появилось предупреждение:
Warning: Config "value.converter.schemas.enable" is unknown
и теперь stacktrace сообщаетCaused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
.2. Какую версию платформы Confluent вы используете?
3. Извините за поздний ответ. Мы используем 5.1.2
4. @user6845507 Удалось ли вам решить проблему? Если да, то каково решение?