#java #apache-kafka #docker-compose #confluent-platform
#java #apache-kafka #docker-compose #слияние-схема-реестр
Вопрос:
итак, я изучаю Kafka и просто пытаюсь настроить его в локальной среде с помощью файла docker compose. Я следую примеру из:
https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html
Следуя этому примеру, я продвинулся довольно далеко, пока не перейду к более поздней половине шага 8.
При попытке выполнить следующее: kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10
внутри контейнера Kafka Connect я получаю следующее сообщение, в котором я не могу определить, к чему он пытается подключиться:
[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-7022
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2020-10-07 20:45:45,547] INFO Kafka version : 2.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,547] INFO Kafka commitId : 4b1dd33f255ddd2f (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,731] INFO Cluster ID: 1ia_Zkc1S1efVJ8JbxiqFA (org.apache.kafka.clients.Metadata)
[2020-10-07 20:45:45,733] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,736] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,737] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,815] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,816] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Setting newly assigned partitions [quickstart-jdbc-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,867] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Resetting offset for partition quickstart-jdbc-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)
Processed a total of 1 messages
[2020-10-07 20:45:46,032] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:118)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:191)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:167)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:160)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:152)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Docker-создать файл:
version: '3.8'
services:
zookeeper:
image: bitnami/zookeeper:latest
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- '2181'
kafka:
image: bitnami/kafka:latest
environment:
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9091,CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9091,CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- "./Components/scripts/files/:/tmp/"
ports:
- '9093:9093'
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- '8081'
depends_on:
- kafka
- zookeeper
kafka-connect-avro:
image: confluentinc/cp-kafka-connect:latest
environment:
- CONNECT_BOOTSTRAP_SERVERS=kafka:9092
- CONNECT_REST_PORT=8083
- CONNECT_GROUP_ID=quickstart-avro
- CONNECT_CONFIG_STORAGE_TOPIC=quickstart-avro-config
- CONNECT_OFFSET_STORAGE_TOPIC=quickstart-avro-offsets
- CONNECT_STATUS_STORAGE_TOPIC=quickstart-avro-status
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-avro
- CONNECT_LOG4J_ROOT_LOGLEVEL=FATAL
- CONNECT_PLUGIN_PATH=/usr/share/java/, /etc/kafka-connect/jars
volumes:
- "./Components/Avro/jars:/etc/kafka-connect/jars"
- "./Components/Avro/file:/tmp/quickstart"
- "./Components/Avro/plugin/confluentinc-kafka-connect-jdbc-5.5.2:/usr/share/java/kafka-connect-jdbc"
ports:
- '8083:8083'
depends_on:
- kafka
mysql:
image: mysql
environment:
- MYSQL_ROOT_PASSWORD=confluent
- MYSQL_USER=confluent
- MYSQL_PASSWORD=confluent
- MYSQL_DATABASE=connect_test
ports:
- "3306:3306"
volumes:
- "./volumes/mysql:/var/lib/mysql"
Ответ №1:
kafka-avro-console-consumer
по умолчанию используется http://localhost:8081
для реестра схемы
Для подключения к контейнеру реестра схемы необходимо добавить еще один аргумент, например --property schema.registry.url=http://schema-registry:8081
https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html#sr-test-drive-avro
Или вы можете запустить exec в контейнер реестра схемы и запустить ту же команду без дополнительных аргументов, поскольку по умолчанию используется локальный экземпляр
Комментарии:
1. Большое спасибо, я не знал о настройке свойства! Для моего примера я добавил «—property schema.registry.url= schema-registry:8081 «, и он смог проанализировать его, как и ожидалось.
2. спасибо за это. но как я могу добавить учетные данные, если реестр схемы имеет базовую аутентификацию? Я проверил документацию и не увидел полей «пользователь» или «пароль».
3. о, nvm,
--property schema.registry.basic.auth.user.info=user:pass --property schema.registry.basic.auth.credentials.source=USER_INFO
сработал