#python #apache-kafka #avro #confluent-schema-registry #confluent-kafka-python
Вопрос:
Я пытаюсь создавать сообщения в формате Avro, используя класс AvroProducer из confluent_kafka. Кафка и Схема-Реестр работают как кластер из 3 узлов в одной сети.
Я читаю схему из строки и инициализирую AvroProducer, как показано ниже:
value_schema = loads("""{"doc": "Messages to be written.",
"namespace": "schemas.avro",
"type": "record",
"name": "kafkagwo",
"fields": [
{"name": "timestamp", "type": "string"},
{"name": "message", "type": "string"}
]}""")
key_schema = loads('{"type": "string"}')
p = AvroProducer({'bootstrap.servers': 'broker1,broker2,broker3',
'schema.registry.url': 'http://broker1:8081'})
Прослушиватель схемы-реестра настроен на http://localhost:8081 на стороне сервера реестра схемы, это брокер1.
Затем попробуйте отправить сообщения, используя приведенный ниже код
value = {'timestamp': timestamp, 'message': message}
p.produce(topic = 'topic-1', partition=0,
key=str('key_0'),
value=value, callback=delivery_report,
value_schema = value_schema, key_schema = key_schema)
Что я получаю, так это
ConnectionError: HTTPConnectionPool(host='broker1', port=8081): Max retries exceeded with url: /subjects/topic-1-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x00000165C1522D88>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
Я не использую контейнер Docker. Кластеры состоят из 3 отдельных виртуальных машин, в которых установлены и запущены Kafka и схема реестра, поэтому они также не являются автономными.
Код Python выполняется с 4-й виртуальной машины, имеющей доступ к сети и исключение брандмауэра для нее.
На самом деле я могу создавать и использовать сообщения без avro и схемы реестра, поэтому я не ожидаю, что проблема будет связана с сетью, но я открыт для идей.
Когда я пытаюсь использовать Avro со схемой реестра, я получаю вышеуказанную ошибку. Кроме того, ошибка указывает на порт схемы реестра 8081, поэтому проблема должна быть связана с ним, но я не знаю, что искать дальше.
Комментарии:
1. Можете ли вы отредактировать свой вопрос, чтобы включить подробную информацию о том, где работает ваш реестр схем относительно кода python? Это похоже на проблему с сетью. Например, есть ли у вас реестр схем, запущенный на той же машине, что и брокер? Вы используете Docker?
2. Я отредактировал сообщение с подробностями, которые вы просили.
Ответ №1:
Проблема возникла из-за конфигурации файла schema-registry.properties. Я установил listeners=http://localhost:8081
, но это соответствует 127.0.0.1 и не открыто для доступа с другой машины.
Я изменил его на listeners=http://0.0.0.0:8081
, и это сработало.