Консольный потребитель Docker Kafka Avro — отказано в подключении

#java #apache-kafka #docker-compose #confluent-platform

#java #apache-kafka #docker-compose #слияние-схема-реестр

Вопрос:

итак, я изучаю Kafka и просто пытаюсь настроить его в локальной среде с помощью файла docker compose. Я следую примеру из:

https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

Следуя этому примеру, я продвинулся довольно далеко, пока не перейду к более поздней половине шага 8.

При попытке выполнить следующее: kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10 внутри контейнера Kafka Connect я получаю следующее сообщение, в котором я не могу определить, к чему он пытается подключиться:

 [2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-7022
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2020-10-07 20:45:45,547] INFO Kafka version : 2.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,547] INFO Kafka commitId : 4b1dd33f255ddd2f (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,731] INFO Cluster ID: 1ia_Zkc1S1efVJ8JbxiqFA (org.apache.kafka.clients.Metadata)
[2020-10-07 20:45:45,733] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,736] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,737] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,815] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,816] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Setting newly assigned partitions [quickstart-jdbc-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,867] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Resetting offset for partition quickstart-jdbc-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)
Processed a total of 1 messages
[2020-10-07 20:45:46,032] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
        at sun.net.www.http.HttpClient.New(HttpClient.java:339)
        at sun.net.www.http.HttpClient.New(HttpClient.java:357)
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:118)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:191)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:167)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:160)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:152)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
  

Docker-создать файл:

 version: '3.8'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - '2181' 
  kafka:
    image: bitnami/kafka:latest
    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9091,CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9091,CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    volumes:
      - "./Components/scripts/files/:/tmp/"
    ports:
      - '9093:9093'
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    ports:
      - '8081'
    depends_on:
      - kafka
      - zookeeper
  kafka-connect-avro:
    image: confluentinc/cp-kafka-connect:latest
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=kafka:9092 
      - CONNECT_REST_PORT=8083 
      - CONNECT_GROUP_ID=quickstart-avro 
      - CONNECT_CONFIG_STORAGE_TOPIC=quickstart-avro-config 
      - CONNECT_OFFSET_STORAGE_TOPIC=quickstart-avro-offsets 
      - CONNECT_STATUS_STORAGE_TOPIC=quickstart-avro-status
      - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter 
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter 
      - CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-avro
      - CONNECT_LOG4J_ROOT_LOGLEVEL=FATAL 
      - CONNECT_PLUGIN_PATH=/usr/share/java/, /etc/kafka-connect/jars
    volumes:
      - "./Components/Avro/jars:/etc/kafka-connect/jars"
      - "./Components/Avro/file:/tmp/quickstart"
      - "./Components/Avro/plugin/confluentinc-kafka-connect-jdbc-5.5.2:/usr/share/java/kafka-connect-jdbc"
    ports:
      - '8083:8083'
    depends_on:
      - kafka
  mysql:
    image: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=confluent 
      - MYSQL_USER=confluent 
      - MYSQL_PASSWORD=confluent 
      - MYSQL_DATABASE=connect_test 
    ports:
      - "3306:3306" 
    volumes:
      - "./volumes/mysql:/var/lib/mysql"
  

Ответ №1:

kafka-avro-console-consumer по умолчанию используется http://localhost:8081 для реестра схемы

Для подключения к контейнеру реестра схемы необходимо добавить еще один аргумент, например --property schema.registry.url=http://schema-registry:8081

https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html#sr-test-drive-avro

Или вы можете запустить exec в контейнер реестра схемы и запустить ту же команду без дополнительных аргументов, поскольку по умолчанию используется локальный экземпляр

Комментарии:

1. Большое спасибо, я не знал о настройке свойства! Для моего примера я добавил «—property schema.registry.url= schema-registry:8081 «, и он смог проанализировать его, как и ожидалось.

2. спасибо за это. но как я могу добавить учетные данные, если реестр схемы имеет базовую аутентификацию? Я проверил документацию и не увидел полей «пользователь» или «пароль».

3. о, nvm, --property schema.registry.basic.auth.user.info=user:pass --property schema.registry.basic.auth.credentials.source=USER_INFO сработал