Реестр схемы слияния — Автопроизводитель — Ошибка подключения

#python #apache-kafka #avro #confluent-schema-registry #confluent-kafka-python

Вопрос:

Я пытаюсь создавать сообщения в формате Avro, используя класс AvroProducer из confluent_kafka. Кафка и Схема-Реестр работают как кластер из 3 узлов в одной сети.

Я читаю схему из строки и инициализирую AvroProducer, как показано ниже:

 value_schema = loads("""{"doc": "Messages to be written.",
        "namespace": "schemas.avro",
        "type": "record",
        "name": "kafkagwo",
        "fields": [
            {"name": "timestamp", "type": "string"},
            {"name": "message", "type": "string"}
        ]}""")
key_schema = loads('{"type": "string"}')
p = AvroProducer({'bootstrap.servers': 'broker1,broker2,broker3',
                 'schema.registry.url': 'http://broker1:8081'})
 

Прослушиватель схемы-реестра настроен на http://localhost:8081 на стороне сервера реестра схемы, это брокер1.

Затем попробуйте отправить сообщения, используя приведенный ниже код

 value = {'timestamp': timestamp, 'message': message}
p.produce(topic = 'topic-1', partition=0,
          key=str('key_0'),
          value=value, callback=delivery_report, 
          value_schema = value_schema, key_schema = key_schema)
 

Что я получаю, так это

 ConnectionError: HTTPConnectionPool(host='broker1', port=8081): Max retries exceeded with url: /subjects/topic-1-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000165C1522D88>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
 

Я не использую контейнер Docker. Кластеры состоят из 3 отдельных виртуальных машин, в которых установлены и запущены Kafka и схема реестра, поэтому они также не являются автономными.
Код Python выполняется с 4-й виртуальной машины, имеющей доступ к сети и исключение брандмауэра для нее.
На самом деле я могу создавать и использовать сообщения без avro и схемы реестра, поэтому я не ожидаю, что проблема будет связана с сетью, но я открыт для идей.
Когда я пытаюсь использовать Avro со схемой реестра, я получаю вышеуказанную ошибку. Кроме того, ошибка указывает на порт схемы реестра 8081, поэтому проблема должна быть связана с ним, но я не знаю, что искать дальше.

Комментарии:

1. Можете ли вы отредактировать свой вопрос, чтобы включить подробную информацию о том, где работает ваш реестр схем относительно кода python? Это похоже на проблему с сетью. Например, есть ли у вас реестр схем, запущенный на той же машине, что и брокер? Вы используете Docker?

2. Я отредактировал сообщение с подробностями, которые вы просили.

Ответ №1:

Проблема возникла из-за конфигурации файла schema-registry.properties. Я установил listeners=http://localhost:8081 , но это соответствует 127.0.0.1 и не открыто для доступа с другой машины.

Я изменил его на listeners=http://0.0.0.0:8081 , и это сработало.