Соединитель Kafka и реестр схем — ошибка при получении схемы Avro — объект не найден

#apache-kafka #avro #apache-kafka-connect #confluent-platform #confluent-schema-registry

#apache-kafka #avro #apache-kafka-connect #confluent-schema-registry

Вопрос:

У меня есть тема, в которой в конечном итоге будет много разных схем. На данный момент у него есть только один. Я создал задание подключения через REST следующим образом:

 {
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}
  

Затем я отправляю сообщение в эту тему со строковым ключом и сериализованной полезной нагрузкой avro. Если я проверю тему в центре управления, я увижу, что поступают правильно десериализованные данные.
Просматривая вывод из экземпляра connect, хотя я вижу это в журналах

 RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
  

Отсюда видно, что существуют две связанные проблемы:

  • Error retrieving Avro schema for id 7
  • Subject not found.; error code: 40401

Что меня беспокоит, так это то, что я указал стратегию как RecordNameStrategy, которая, я думаю, должна использовать волшебный байт для перехода и получения схемы в отличие от названия темы, но ошибки в теме не найдены. Я не уверен, действительно ли он ищет имя темы или получает схему по идентификатору. В любом случае, подключившись по ssh к экземпляру connect и выполнив curl для http://schema-registry-service:8081/schemas/ids/7 , я получаю возвращенную схему. Над этой трассировкой стека есть некоторые дополнительные записи, которые, к сожалению, выглядят так, как будто они все еще используют стратегию неправильного имени:

 INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
  

У кого-нибудь есть какие-либо подсказки о том, как это решить? Я использую следующие изображения:

  • confluentinc/cp-kafka-connect: 5.2.0
  • confluentinc/ cp-kafka:5.1.0

Спасибо

Ответ №1:

В трассировке lookUpSubjectVersion означает, что он пытался выполнить поиск по /subjects/:name/versions каждому указанному там идентификатору, затем не смог найти schemaId=7 (Примечание: не version = 7), хотя из журналов не слишком ясно, что :name он пытается здесь использовать, но если это не найдено, вы получите свою Subject not found ошибку. Если мой PR был принят, имя темы было бы более понятным

Я полагаю, что это может быть связано с использованием RecordNameStrategy . Глядя на PR для этого свойства, я понял, что оно действительно было протестировано только на коде производителя / потребителя, а не полностью в API Connect. По сравнению с поведением по умолчанию TopicNameStrategy

Который, как вы можете видеть, пытался использовать

 value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
  

Но, присмотревшись повнимательнее, я думаю, что вы, возможно, настроили ее неправильно.

Аналогично тому, как у вас есть value.converter.schema.registry.url , вам действительно нужно было бы установить value.converter.value.subject.name.strategy вместо этого.

Комментарии:

1. sob, так что лучше всего, вероятно, просто использовать конвертер byte [] и затем вручную выполнить поиск схемы?

2. Не могли бы вы попробовать сначала изменить свойство на value.converter.value.subject.name.strategy ? В принципе, value.converter префикс анализируется для всех конфигураций, специфичных для значения записи, затем в него нужно будет передать само имя конфигурации value.subject.name.strategy … Аналогично, если ключом является avro, установите key.converter.key.subject.name.strategy

3. кстати, key.deserializer ничего не делает в свойствах

4. нет кости. использование "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry-service:8081", "value.converter.value.schema.registry.url":"http://schema-registry-service:8081", "value.converter.value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy", выдает мне ту же ошибку. Я создам свой собственный: ( Спасибо

5. Ну, в журналах это показано AvroConverterConfig values , я думаю, что есть какой-то способ заставить value.subject.name.strategy отображать установленное вами значение. Поскольку schema.registry.url там установлено (на основе value.converter.schema.registry.url ), то value.subject.name.strategy должно работать также после префикса конвертера… Примечание: value.converter.value.schema.registry.url недопустимая конфигурация, поскольку value.schema.registry.url сама по себе не является свойством

Ответ №2:

чтобы решить эту проблему, проверьте, что пространство имен в переменной

 public static final org.apache.avro.Schema SCHEMA$ 
  

это то же самое, что и пакет сгенерированного файла Java

Комментарии:

1. Как это устраняет ошибку HTTP?