#jdbc #apache-kafka #avro #apache-kafka-connect
#jdbc #apache-kafka #avro #apache-kafka-connect
Вопрос:
Мы пытаемся записать обратно значения из раздела в базу данных postgres, используя соединительный соединитель приемника JDBC.
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
Мы можем прочитать значение в консоли с помощью:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name
Схема существует, и значение правильно десериализуется с помощью kafka-avro-console-consumer
, поскольку оно не выдает ошибок, но соединитель выдает эти ошибки:
{
"name": "datawarehouse_sink",
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "x.x.x.x:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handlerntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)ntat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)ntat java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sinkntat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)ntat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)nt... 13 morenCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!n"
}
],
"type": "sink"
}
Последняя ошибка :
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Схема зарегистрирована в реестре схем.
Связана ли проблема с файлом конфигурации соединителя?
Ответ №1:
Ошибка org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
означает, что сообщение по теме было недопустимым Avro и не могло быть десериализовано. Существует несколько причин, по которым это может быть:
-
Некоторые сообщения являются Avro, но другие нет. Если это так, вы можете использовать возможности обработки ошибок в Kafka Connect, чтобы игнорировать недопустимые сообщения, используя конфигурацию, подобную этой:
"errors.tolerance": "all", "errors.log.enable":true, "errors.log.include.messages":true
-
Значением является Avro, а ключом — нет. Если это так, то используйте соответствующий
key.converter
.
Подробнее:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained /
Ответ №2:
Это означает, что десериализатор проверил первые 5 байтов сообщения и обнаружил нечто неожиданное. Подробнее об упаковке сообщений с помощью сериализатора здесь , проверьте раздел «формат передачи». Просто предположу, что нулевой байт в сообщении! = 0