#apache-kafka #apache-kafka-connect
#апачи-кафка #apache-kafka-connect
Вопрос:
После регистрации соединителя данных kafka — с пользовательской схемой — статус показывает, что он сбойный: «состояние»: «сбой».
Вот регистрация
$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "topicx",
"schema.filename": "myschema.avro",
"schema.keyfield": "userid",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
Вот проверка состояния («Сбой»)
curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schemantat org.apache.avro.Schema.parse(Schema.java:1595)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1394)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1365)ntat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)ntat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)ntat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:834)n"}],"type":"source"}
(ПРИМЕЧАНИЕ: команда curl выполняется из той же папки, которая содержит файл myschema.avro)
вот дополнительная справочная информация, если это необходимо
Dockerfile…
FROM confluentinc/cp-kafka-connect-base:6.0.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
connect:
image: localimage/kafka-connect-datagen:latest
build:
context: .
dockerfile: Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
myschema.avro
{
"namespace": "ksql",
"name": "users",
"type": "record",
"fields": [
{"name": "registertime", "type": {
"type": "long",
"arg.properties": {
"range": {
"min": 1487715775521,
"max": 1519273364600
}
}
}},
{"name": "userid", "type": {
"type": "string",
"arg.properties": {
"regex": "User_[1-9]{0,1}"
}
}},
{"name": "regionid", "type": {
"type": "string",
"arg.properties": {
"regex": "Region_[1-9]?"
}
}},
{"name": "gender", "type": {
"type": "string",
"arg.properties": {
"options": [
"MALE",
"FEMALE",
"OTHER"
]
}
}}
]
}
запустил docker-compose…
docker-compose up -d --build
конфигурация зарегистрированного соединителя datagen
$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "topicx",
"schema.filename": "myschema.avro",
"schema.keyfield": "userid",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
проверенный статус
curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schemantat org.apache.avro.Schema.parse(Schema.java:1595)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1394)ntat org.apache.avro.Schema$Parser.parse(Schema.java:1365)ntat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)ntat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)ntat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:834)n"}],"type":"source"}
Ответ №1:
Он сообщает вам, почему произошел сбой. Схема имеет значение null.
Если schema.filename
его нет в контейнере, то схема не сможет быть прочитана, независимо от того, где была запущена команда curl
Вы можете либо скопировать его в контейнер, либо использовать монтирование тома, и вы также захотите использовать полный путь к файлу в конфигурации
Комментарии:
1. спасибо — как, вероятно, очевидно, я в некотором роде докер, докер-композит и кафка-новичок. —Я попробую ваше предложение и отмечу ваш ответ, как только я проверю. ценю вашу помощь