Ошибка схемы в приемнике JDBC с использованием ByteArrayConverter для ключа и значения

#jdbc #apache-kafka #apache-kafka-connect

#jdbc #apache-kafka #apache-kafka-connect

Вопрос:

Я пытаюсь настроить простой приемник JDBC PostgreSQL для копирования данных из одной темы Kafka в мой экземпляр PostgreSQL.

Я хотел бы просто скопировать байты из одной темы Kafka в одну таблицу Postgres. Не должно быть никакого преобразования данных.

Я использую эти конвертеры:

 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
  

Сообщение об ошибке для меня очень странное:

Sink connector 'MY_CONNECTOR' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='MY_TOPIC',partition=0,offset=664253924,timestamp=1602894844052) with a byte[] value and bytes value schema.

В чем может быть проблема? Почему он запрашивает схему, если задействованы только байты?

Спасибо.

Ответ №1:

Приемник не знает, как распаковать массив байтов в столбцы вашей таблицы.

Поэтому вам понадобится структура с именованными полями, и внутри вы могли бы присвоить значения bytearray, предполагая, что типы столбцов являются большими двоичными объектами

Неясно, нужны ли вам как ключ, так и значение в базе данных, но самый простой способ создать структуру после ByteArrayConverter — поднять ее

 "transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "_c0"
  

Измените _c0 фактическое имя столбца базы данных