ошибка kafka-python, когда я не получал сообщения с длительного периода

#python #apache-kafka

#python #apache-kafka

Вопрос:

Я использую библиотеку kafka_python== 2.0.0 ,

С помощью приведенного ниже фрагмента кода, если я не получаю сообщение в течение 1 часа, следующее сообщение, отправленное в теме kafka, не обрабатывается потребителем, однако цикл не останавливается.

Я бы хотел, чтобы мой слушатель работал 24/24 без потери соединения

  consumer = KafkaConsumer(
    os.environ.get('MY_TOPIC'),
    bootstrap_servers=broker,
    api_version=my_version,
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='GSSAPI',
    sasl_kerberos_service_name=service_name,
    group_id='MY_GRP_ID',
    max_poll_records=1
    
)

try:
    for msg in consumer:
        ##PROCESS function ... 
        consumer.commit()
       
finally:
    consumer.close()
  

Комментарии:

1. У меня нет журнала или ошибки, цикл просто неопределенно ждет следующей итерации

Ответ №1:

Я, наконец, использую метод опроса :

 from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
while True:
    # Response format is {TopicPartiton('topic1', 1): [msg1, msg2]}
    msg_pack = consumer.poll(timeout_ms=500)

    for tp, messages in msg_pack.items():
        # message value and key are raw bytes -- decode if necessary!
        # e.g., for unicode: `message.value.decode('utf-8')`
        print ("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
                                              message.offset, message.key,
                                              message.value))
  

Преимущество моей точки зрения на этот синтаксис заключается в лучшей видимости того, как извлекать сообщения. это не в моем примере, но я мог бы лучше управлять остановкой программы, запрашивая сигнал sigterm