почему простой соединитель kafka datagen завершается с ошибкой? (т.Е. «Состояние»: «сбой»)

#apache-kafka #apache-kafka-connect

#апачи-кафка #apache-kafka-connect

Вопрос:

После регистрации соединителя данных kafka — с пользовательской схемой — статус показывает, что он сбойный: «состояние»: «сбой».

Вот регистрация

 $ curl -i -X PUT http://localhost:8083/connectors/datagen01/config  -H "Content-Type: application/json"  -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "topicx",
    "schema.filename": "myschema.avro",
    "schema.keyfield": "userid",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
 

Вот проверка состояния («Сбой»)

 curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schemantat org.apache.avro.Schema.parse(Schema.java:1595)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1394)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1365)ntat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)ntat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)ntat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:834)n"}],"type":"source"} 
 

(ПРИМЕЧАНИЕ: команда curl выполняется из той же папки, которая содержит файл myschema.avro)


вот дополнительная справочная информация, если это необходимо

Dockerfile…

 FROM confluentinc/cp-kafka-connect-base:6.0.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
 

docker-compose.yml

 version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      
 

myschema.avro

 {
    "namespace": "ksql",
    "name": "users",
    "type": "record",
    "fields": [
        {"name": "registertime", "type": {
            "type": "long",
            "arg.properties": {
                "range": {
                    "min": 1487715775521,
                    "max": 1519273364600
                }
            }
        }},
        {"name": "userid", "type": {
            "type": "string",
            "arg.properties": {
                "regex": "User_[1-9]{0,1}"
            }
        }},
        {"name": "regionid", "type": {
            "type": "string",
            "arg.properties": {
                "regex": "Region_[1-9]?"
            }
        }},
        {"name": "gender", "type": {
            "type": "string",
            "arg.properties": {
                "options": [
                    "MALE",
                    "FEMALE",
                    "OTHER"
                ]
            }
        }}
    ]
}   
 

запустил docker-compose…

     docker-compose up -d --build
    
 

конфигурация зарегистрированного соединителя datagen

 $ curl -i -X PUT http://localhost:8083/connectors/datagen01/config  -H "Content-Type: application/json"  -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "topicx",
    "schema.filename": "myschema.avro",
    "schema.keyfield": "userid",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
 

проверенный статус

 curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schemantat org.apache.avro.Schema.parse(Schema.java:1595)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1394)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1365)ntat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)ntat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)ntat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:834)n"}],"type":"source"} 
 

Ответ №1:

Он сообщает вам, почему произошел сбой. Схема имеет значение null.

Если schema.filename его нет в контейнере, то схема не сможет быть прочитана, независимо от того, где была запущена команда curl

Вы можете либо скопировать его в контейнер, либо использовать монтирование тома, и вы также захотите использовать полный путь к файлу в конфигурации

Комментарии:

1. спасибо — как, вероятно, очевидно, я в некотором роде докер, докер-композит и кафка-новичок. —Я попробую ваше предложение и отмечу ваш ответ, как только я проверю. ценю вашу помощь