#apache-kafka #avro #apache-kafka-connect #confluent-platform #confluent-schema-registry
#apache-kafka #avro #apache-kafka-connect #confluent-schema-registry
Вопрос:
У меня есть тема, в которой в конечном итоге будет много разных схем. На данный момент у него есть только один. Я создал задание подключения через REST следующим образом:
{
"name":"com.mycompany.sinks.GcsSinkConnector-auth2",
"config": {
"connector.class": "com.mycompany.sinks.GcsSinkConnector",
"topics": "auth.events",
"flush.size": 3,
"my.setting":"bar",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry-service:8081",
"value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
"group.id":"account-archiver"
}
}
Затем я отправляю сообщение в эту тему со строковым ключом и сериализованной полезной нагрузкой avro. Если я проверю тему в центре управления, я увижу, что поступают правильно десериализованные данные.
Просматривая вывод из экземпляра connect, хотя я вижу это в журналах
RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Отсюда видно, что существуют две связанные проблемы:
Error retrieving Avro schema for id 7
Subject not found.; error code: 40401
Что меня беспокоит, так это то, что я указал стратегию как RecordNameStrategy, которая, я думаю, должна использовать волшебный байт для перехода и получения схемы в отличие от названия темы, но ошибки в теме не найдены. Я не уверен, действительно ли он ищет имя темы или получает схему по идентификатору. В любом случае, подключившись по ssh к экземпляру connect и выполнив curl для http://schema-registry-service:8081/schemas/ids/7
, я получаю возвращенную схему. Над этой трассировкой стека есть некоторые дополнительные записи, которые, к сожалению, выглядят так, как будто они все еще используют стратегию неправильного имени:
INFO AvroConverterConfig values:
schema.registry.url = [http://schema-registry-service:8081]
basic.auth.user.info = [hidden]
auto.register.schemas = false
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
У кого-нибудь есть какие-либо подсказки о том, как это решить? Я использую следующие изображения:
- confluentinc/cp-kafka-connect: 5.2.0
- confluentinc/ cp-kafka:5.1.0
Спасибо
Ответ №1:
В трассировке lookUpSubjectVersion
означает, что он пытался выполнить поиск по /subjects/:name/versions
каждому указанному там идентификатору, затем не смог найти schemaId=7
(Примечание: не version = 7), хотя из журналов не слишком ясно, что :name
он пытается здесь использовать, но если это не найдено, вы получите свою Subject not found
ошибку. Если мой PR был принят, имя темы было бы более понятным
Я полагаю, что это может быть связано с использованием RecordNameStrategy
. Глядя на PR для этого свойства, я понял, что оно действительно было протестировано только на коде производителя / потребителя, а не полностью в API Connect. По сравнению с поведением по умолчанию TopicNameStrategy
Который, как вы можете видеть, пытался использовать
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
Но, присмотревшись повнимательнее, я думаю, что вы, возможно, настроили ее неправильно.
Аналогично тому, как у вас есть value.converter.schema.registry.url
, вам действительно нужно было бы установить value.converter.value.subject.name.strategy
вместо этого.
Комментарии:
1. sob, так что лучше всего, вероятно, просто использовать конвертер byte [] и затем вручную выполнить поиск схемы?
2. Не могли бы вы попробовать сначала изменить свойство на
value.converter.value.subject.name.strategy
? В принципе,value.converter
префикс анализируется для всех конфигураций, специфичных для значения записи, затем в него нужно будет передать само имя конфигурацииvalue.subject.name.strategy
… Аналогично, если ключом является avro, установитеkey.converter.key.subject.name.strategy
3. кстати,
key.deserializer
ничего не делает в свойствах4. нет кости. использование
"key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry-service:8081", "value.converter.value.schema.registry.url":"http://schema-registry-service:8081", "value.converter.value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
выдает мне ту же ошибку. Я создам свой собственный: ( Спасибо5. Ну, в журналах это показано
AvroConverterConfig values
, я думаю, что есть какой-то способ заставитьvalue.subject.name.strategy
отображать установленное вами значение. Посколькуschema.registry.url
там установлено (на основеvalue.converter.schema.registry.url
), тоvalue.subject.name.strategy
должно работать также после префикса конвертера… Примечание:value.converter.value.schema.registry.url
недопустимая конфигурация, посколькуvalue.schema.registry.url
сама по себе не является свойством
Ответ №2:
чтобы решить эту проблему, проверьте, что пространство имен в переменной
public static final org.apache.avro.Schema SCHEMA$
это то же самое, что и пакет сгенерированного файла Java
Комментарии:
1. Как это устраняет ошибку HTTP?