Вызвано: org.apache.kafka.common.errors.Исключение SerializationException: Ошибка регистрации схемы Avro

#apache-kafka #confluent-platform #confluent-schema-registry #debezium #confluent-cloud

#apache-kafka #confluent-platform #confluent-schema-registry #debezium #confluent-cloud

Вопрос:

У меня есть поток конвейера, в котором я подключаю соединитель debezium CDC mysql от confluent platform к Confluent Cloud, поскольку встроенный в облако соединитель debezium mysql находится в предварительном просмотре, и я успешно установил соединение, и сообщения из темы подписаны соединителем приемника S3. Изначально у меня был поток в формате json, но позже я захотел, чтобы это было в формате AVRO, поэтому я изменил файл конфигурации соединителя для преобразователей ключей и значений, как показано ниже:

Debezium connector json:

 {
    "name":"mysql_deb3",
    "config":{
       "connector.class":"io.debezium.connector.mysql.MySqlConnector",
       "tasks.max":"1",
       "database.hostname":"host_name",
       "database.port":"3306",
       "database.user":"user_name",
       "database.password":"password",
       "database.server.id":"123456789",
       "database.server.name": "server_name",
       "database.whitelist":"db_name",
       "database.history.kafka.topic":"dbhistory.db_name",
       "include.schema.changes": "true",
       "table.whitelist": "db_name.table_name",
       "tombstones.on.delete": "false",
       "key.converter": "io.confluent.connect.avro.AvroConverter",
       "value.converter": "io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "value.converter.schema.registry.url": "cloud_schema_registry_endpoint",
       "key.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "value.converter.schema.registry.basic.auth.user.info":"schema_registry_api_key:schema_registry_api_secret",
       "decimal.handling.mode": "double",
       "transforms": "unwrap",
       "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
       "transforms.unwrap.drop.tombstones": "true",
       "transforms.unwrap.delete.handling.mode": "rewrite",
"database.history.kafka.bootstrap.servers":"confluent_cloud_kafka_server_endpoint:9092",
"database.history.consumer.security.protocol":"SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm":"https",
"database.history.consumer.sasl.mechanism":"PLAIN",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";",
"database.history.producer.security.protocol":"SASL_SSL",
"database.history.producer.ssl.endpoint.identification.algorithm":"https",
"database.history.producer.sasl.mechanism":"PLAIN",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";",
    }
 }
 

####################################################################

connect-distributed.properties:

 bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
request.timeout.ms=20000
retry.backoff.ms=500

producer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
producer.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500

consumer.bootstrap.servers=confluent_cloud_kafka_server_endpoint:9092
consumer.ssl.endpoint.identification.algorithm=https
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="cloud_kafka_api" password="cloud_kafka_api_secret";
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500

offset.flush.interval.ms=10000
group.id=connect-cluster
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3

schema.registry.url=https://cloud_schema_registry_endpoint
schema.registry.basic.auth.user.info=<schema_registry_api_key>:<schema_registry_api_secret>
 

#################################################

— Я запускаю kafka connect с помощью —> bin/connect-distributed etc/connect-distributed.properties

— Соединение запускается нормально, но когда я пытаюсь загрузить соединитель debezium с помощью команды curl, появляется ошибка «неавторизованный», приведенная ниже, но предоставленные мной ключи и секреты api верны, что я также проверил его вручную с помощью cli.

Вызвано: org.apache.kafka.connect.errors.Исключение данных: staging-development-rds-cluster в io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:78) в org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) в org.apache.kafka.connect.runtime.errors.Повторите попытку с toleranceoperator.execAndRetry(повторите попытку с toleranceoperator.java:128) в org.apache.kafka.connect.runtime.errors.Ошибка RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) … еще 11 Вызвано: org.apache.kafka.common.errors.SerializationException: ошибка регистрации схемы Avro: {«type»:»record»,»name»:»SchemaChangeKey»,»namespace»:»io.debezium.connector.mysql»,»fields»:[{«name»:»databaseName»,»type»:»string»}],»connect.name «:»ввод-вывод.debezium.connector.mysql.SchemaChangeKey»} Вызвано: io.confluent.kafka.schemaregistry.client.rest.exceptions.Исключение RestClientException: несанкционированный; код ошибки: 401 при вводе-выводе.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209) при вводе-выводе.confluent.kafka.schemaregistry.client.rest.RestService.HttpRequest(RestService.java:235) при вводе-выводе.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:326) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:318) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:313) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:119) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:156) в io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) в io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:117) в io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:76) в org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:266) в org.apache.kafka.connect.runtime.errors.Повторите попытку с toleranceoperator.execAndRetry(повторите попытку с toleranceoperator.java:128) в org.apache.kafka.connect.runtime.errors.Ошибка RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) в org.apache.kafka.connect.runtime.errors.Повторите попытку с помощью toleranceoperator.execute(RetryWithToleranceOperator.java:104) в org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:266) в org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293) в org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228) в org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) в org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java: 219) в java.util.concurrent.Исполнители $RunnableAdapter.call(Executors.java:511) в java.util.concurrent.FutureTask.run(FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) на java.lang.Thread.run(Thread.java:748) [2020-11-30 05:30:47,389] ОШИБКА WorkerSourceTask{id=mysql_deb3-0} Задача завершается и не будет восстановлена до перезапуска вручную (org.apache.kafka.connect.runtime.WorkerTask:178) [2020-11-30 05:30:47,389] ИНФОРМАЦИЯ Об остановке разъема (io.debezium.connector.common.BaseSourceTask:187) [2020-11-30 05:30:47,389] ИНФОРМАЦИЯ Об остановке задачи соединителя MySQL (io.debezium.connector.mysql.MySqlConnectorTask:458)

Пожалуйста, ребята, помогите мне в этом. Заранее спасибо