Соединительный разъем приемника InfluxDB

#apache-kafka-connect #confluent-platform

#apache-kafka-connect #confluent-платформа

Вопрос:

Я пытаюсь использовать соединитель приемника Confluent InfluxDB для получения данных из темы в мою InfluxDB. Конфигурация выглядит следующим образом:

 connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1
  

Все, что я получаю при создании нового соединителя через пользовательский интерфейс Kafka Connect, — это следующее исключение:

 org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more
  

Значения в разделе представляют собой строки json, подобные этому: {"pid":1,"filename":"test1.csv"} . Есть ли какая-либо конфигурация, которой мне здесь не хватает?

Обновление: Вот моя рабочая конфигурация:

 config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
  

Комментарии:

1. Можете ли вы также предоставить общий доступ к своему рабочему properties файлу

Ответ №1:

Соединителю InfluxDB требуется, чтобы в данных присутствовала схема, поэтому, если у вас есть данные JSON, вам необходимо установить

 value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
  

НО ваш JSON должен включать схему, поэтому вместо

 {"pid":1,"filename":"test1.csv"}
  

вам понадобится что-то вроде

 {
    "schema": {
        "type": "struct", "optional": false, "version": 1, "fields": [
            { "field": "pid", "type": "string", "optional": true },
            { "field": "filename", "type": "string", "optional": true }
        ]
    },
    "payload": {
        "pid": 1,
        "filename": "test1.csv"
    }
}
  

Ссылка:https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect /

Более подробную информацию о том, как применить схему к вашим данным, смотрите в этом блоге

Более подробную информацию о конвертерах в целом смотрите в этой статье.

Комментарии:

1. Спасибо, не знал, что вы можете настраивать конвертеры для каждого соединителя. Я создал новую конфигурацию с предоставленными вами свойствами. В пользовательском интерфейсе появилось предупреждение: Warning: Config "value.converter.schemas.enable" is unknown и теперь stacktrace сообщает Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct .

2. Какую версию платформы Confluent вы используете?

3. Извините за поздний ответ. Мы используем 5.1.2

4. @user6845507 Удалось ли вам решить проблему? Если да, то каково решение?