Ошибки / проблемы при использовании разных конвертеров в kafka — подключение к S3

#apache-kafka #avro #apache-kafka-connect #confluent-platform #confluent-schema-registry

#apache-kafka #avro #apache-kafka-connect #confluent-платформа #confluent-схема-реестр

Вопрос:

Я пытался реализовать объединенный образ kafka-connect для подключения или our на prem S3. Мы успешно выполнили запись в s3 из коробки, используя Boto3. Итак, мы знаем, что это не проблема с подключением.

В зависимости от того, какие конвертеры я использую .. они выдают разные ошибки.

Вот переменные среды, запущенные в контейнере docker.

 CONNECT_CONFIG_STORAGE_TOPIC=__kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC=__kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC=__kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3
CONNECT_CONFIG_STORAGE_PARTITIONS=1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3
CONNECT_OFFSET_STORAGE_PARTITIONS=1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3
CONNECT_STATUS_STORAGE_PARTITIONS=1
CONNECT_REST_ADVERTISED_HOST_NAME=hostname
CONNECT_REST_ADVERTIZED_LISTENER=listener
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLED=false
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLED=true
CONNECT_REST_ADVERTISED_PORT=8083
CONNECT_REPLICATION_FACTOR=2
CONNECT_GROUP_ID=APP-CONNECT
CONNECT_CONSUMER_BOOTSTRAP_SERVERS=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
CONNECT_BOOTSTRAP_SERVERS=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';
CONNECT_CONSUMER_SSL_PROTOCOL=SSL
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.client.truststore.jks
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD=password
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/kafka_connect/log4j/log4j.properties
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components
CONNECT_REST_PORT=8083
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
CONNECT_SSL_PROTOCOL=SSL
CONNECT_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.client.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD=password
CONNECT_ZOOKEEPER_CONNECT=SASL_SSL://server-1.com:9092,SASL_SSL://server-2.com:9092,SASL_SSL://server-3.com:9092
  
 {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';",
    "flush.size": "1500",
    "topics": "inventory",
    "tasks.max": "2",
    "rotate.interval.ms": "1000",
    "consumer.sasl.mechanism": "PLAIN",
    "store.url": "http://s3-server:9020",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "internal.key.converter.schemas.enable": "false",
    "internal.value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enabled": "false",
    "value.converter.schemas.enabled": "true",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "inventory-2",
    "consumer.security.protocol": "SASL_SSL",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "inventory-stage"
}
  

Я получаю то, что кажется успешным запуском. Однако, когда я проверяю корзину; У меня там нет никаких объектов. Я подтвердил, используя kafka-avro-consule-consumer, что сообщения avro действительно существуют в теме.

 [2019-04-11 18:14:52,612] INFO [Consumer clientId=consumer-42, groupId=connect-inventory-2] Resetting offset for partition inventory-0 to offset 9. (org.apache.kafka.clients.consumer.internals.Fetcher)
[2019-04-11 18:14:52,614] INFO Opening record writer for: topics/inventory/partition=2/inventory 2 0000000008.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2019-04-11 18:14:52,621] INFO [Consumer clientId=consumer-42, groupId=connect-inventory-2] Resetting offset for partition inventory-1 to offset 8. (org.apache.kafka.clients.consumer.internals.Fetcher)
[2019-04-11 18:14:52,621] WARN Property 'rotate.interval.ms' is set to '1000ms' but partitioner is not an instance of 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner'. This property is ignored. (io.confluent.connect.s3.TopicPartitionWriter)
[2019-04-11 18:14:52,621] WARN Property 'rotate.interval.ms' is set to '1000ms' but partitioner is not an instance of 'io.confluent.connect.storage.partitioner.TimeBasedPartitioner'. This property is ignored. (io.confluent.connect.s3.TopicPartitionWriter)
[2019-04-11 18:14:52,626] INFO Opening record writer for: topics/inventory/partition=1/inventory 1 0000000008.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2019-04-11 18:14:52,645] INFO Opening record writer for: topics/inventory/partition=0/inventory 0 0000000009.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
  

Когда я меняю преобразователь значений на AvroConverter. Думая, что сообщения находятся в Avro и должны быть преобразованы для использования API connector.

 {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='pw';",
    "flush.size": "1500",
    "topics": "inventory",
    "tasks.max": "2",
    "rotate.interval.ms": "1000",
    "consumer.sasl.mechanism": "PLAIN",
    "store.url": "http://s3-server:9020",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "internal.key.converter.schemas.enable": "false",
    "internal.value.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enabled": "false",
    "value.converter.schemas.enabled": "true",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "inventory-2",
    "consumer.security.protocol": "SASL_SSL",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "inventory-stage"
}
  

Это указывает на то, что конвертер avro не может найти схему с идентификатором 41. Однако этот идентификатор существует в реестре схемы. Смотрите ниже

 [2019-04-11 18:26:56,813] ERROR WorkerSinkTask{id=inventory-2-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: inventory
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 41
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:302)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:290)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:129)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:230)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:139)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2019-04-11 18:26:56,814] ERROR WorkerSinkTask{id=inventory-2-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2019-04-11 18:26:56,815] INFO [Consumer clientId=consumer-44, groupId=connect-inventory-2] Sending LeaveGroup request to coordinator localhost:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
  
 

{
  "subject": "inventory-com.company.dcp.event.schema.shotify.SongCreatedEvent",
  "version": 1,
  "id": 41,
  "schema": "{"type":"record","name":"SongCreatedEvent","namespace":"com.company.dcp.event.schema.shotify","doc":"Information about the Song Added event","fields":[{"name":"eventHeader","type":{"type":"record","name":"EventHeader","namespace":"com.company.commons.shotify","fields":[{"name":"id","type":{"type":"string","avro.java.string":"String"}},{"name":"time","type":{"type":"long","logicalType":"timestamp-millis"}},{"name":"type","type":{"type":"string","avro.java.string":"String"}},{"name":"source","type":{"type":"string","avro.java.string":"String"}}]}},{"name":"song","type":{"type":"record","name":"Song","namespace":"com.company.commons.shotify","fields":[{"name":"title","type":{"type":"string","avro.java.string":"String"},"doc":"Title of the Song"},{"name":"artist","type":{"type":"string","avro.java.string":"String"},"doc":"The song composer"},{"name":"duration","type":"int","doc":"Song Duration in minutes"},{"name":"bitrate","type":"int","doc":"Song Bitrate, measured in kilobytes per second"},{"name":"lyrics","type":{"type":"string","avro.java.string":"String"},"doc":"Lyrics of the Song"},{"name":"fileURL","type":{"type":"string","avro.java.string":"String"},"doc":"Unique file Reference to the song"}]}}],"version":"2"}"
}


  

Комментарии:

1. Ваш размер флеш-памяти равен 1500. Сколько сообщений вы отправили? Похоже, вы также начали с последнего смещения