#jdbc #apache-kafka #apache-kafka-connect
#jdbc #apache-kafka #apache-kafka-connect
Вопрос:
Я пытаюсь настроить простой приемник JDBC PostgreSQL для копирования данных из одной темы Kafka в мой экземпляр PostgreSQL.
Я хотел бы просто скопировать байты из одной темы Kafka в одну таблицу Postgres. Не должно быть никакого преобразования данных.
Я использую эти конвертеры:
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Сообщение об ошибке для меня очень странное:
Sink connector 'MY_CONNECTOR' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='MY_TOPIC',partition=0,offset=664253924,timestamp=1602894844052) with a byte[] value and bytes value schema.
В чем может быть проблема? Почему он запрашивает схему, если задействованы только байты?
Спасибо.
Ответ №1:
Приемник не знает, как распаковать массив байтов в столбцы вашей таблицы.
Поэтому вам понадобится структура с именованными полями, и внутри вы могли бы присвоить значения bytearray, предполагая, что типы столбцов являются большими двоичными объектами
Неясно, нужны ли вам как ключ, так и значение в базе данных, но самый простой способ создать структуру после ByteArrayConverter — поднять ее
"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "_c0"
Измените _c0
фактическое имя столбца базы данных