#python #apache-kafka #avro #kafka-python #confluent-kafka-python
#python #apache-kafka #avro #kafka-python #confluent-kafka-python
Вопрос:
Я пытаюсь прочитать Kafka из python, но получаю сообщение None, ошибок в CLI нет. Я использую перенаправление портов на хост назначения через putty, а не тестирую порты через telnet — все работает нормально. Более того, я использую kafkacat в Debian (WSL), и он отлично работает!
kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081
Я использую PyCharm, мой код приведен ниже в тексте. Как мне выполнить отладку?
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError
topics = ['topic1', 'topic2']
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'smallest',
'schema.registry.url': 'http://localhost:28081',
'api.version.request': True
})
c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])
while True:
try:
msg = c.poll(1)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
print('Message None')
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
как
Ответ №1:
Первое, что я сделаю, это удостоверюсь, что сообщения приходят по вашим темам с помощью kafka-avro-console-consumer
инструмента.
Затем в вашем приложении вы можете попытаться увеличить уровень журнала:
c = AvroConsumer({
# ... your config here
'log_level': 7,
'debug': 'all',
})
Вы можете увидеть различные параметры здесь: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Но я полагаю, что ваша проблема связана с тем, как вы назначаете разделы. Если вы используете subscribe
, то разделы автоматически назначаются вашему потребителю кластером. Вы можете добавить обратный вызов при подписке на вы можете видеть, какие разделы назначены вашему потребителю, но вам не обязательно делать это самостоятельно. Смотрите https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html#confluent_kafka .Потребитель.подписаться
Комментарии:
1. Спасибо за строку отладки! Я добавляю комментарий к strin
#tp = TopicPartition(topics[0], 0, 0) #c.assign([tp])
и теперь у меня есть сообщение отладки, где я вижу смещение 6 сообщений, но я не понимаю, почему это не сработало. link_bug_text . @Arthur, ты можешь помочь?