кафка коннект — s3sink — декодирование двоично-десятичных значений

#apache-kafka #apache-kafka-connect

Вопрос:

Я использую debezium для передачи данных в кафку, а режим десятичной обработки precise

Также эти строки добавлены в мою конфигурацию(debezium):

 "key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
 

Но при использовании этого с помощью разъема s3-приемника значения все еще находятся в двоичном формате.

Есть ли какие-либо настройки на стороне разъема приемника для его декодирования?

Обновление:1:

Мой конвертер значений-реестр схем aws glue.

 "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter"
 

Обновление 2:

Конфигурация Debezium:

 {
    "name": "mysql-connecter-01",
    "config": {
        "name": "mysql-connecter-01",
        "connecter.class": "io.debezium.connecter.mysql.MySqlconnecter",
        "database.server.id": "1",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "KAFKA_BROKER_NODE_IP:9092",
        "database.history.kafka.topic": "mysql-db01.schema-changes.mysql",
        "database.server.name": "mysql-db01",
        "database.hostname": "MYSQL_IP",
        "database.port": "3306",
        "database.user": "MYSQL_USER",
        "database.password": "MYSQL_PASS",
        "database.whitelist": "bhuvi",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "ts_ms",
        "tombstones.on.delete": false,
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "internal.key.converter.schemas.enable": "false",
        "internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "internal.value.converter.schemas.enable": "false",
        "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.region": "ap-south-1",
        "key.converter.schemaAutoRegistrationEnabled": "true",
        "value.converter.schemaAutoRegistrationEnabled": "true",
        "key.converter.avroRecordType": "GENERIC_RECORD",
        "value.converter.avroRecordType": "GENERIC_RECORD",
        "key.converter.registry.name": "bhuvi-debezium",
        "value.converter.registry.name": "bhuvi-debezium",
        "snapshot.mode": "initial"
    }
}
 

Комментарии:

1. В самих свойствах конфигурации S3 их нет. Обработка десериализации будет выполняться в конвертере, если не в источнике. В противном случае, это то, transforms что вы хотите. Однако можете ли вы поделиться своей полной конфигурацией debezium?

2. вопрос обновлен