#python #apache-kafka
#python #apache-kafka
Вопрос:
Я использую библиотеку kafka_python== 2.0.0 ,
С помощью приведенного ниже фрагмента кода, если я не получаю сообщение в течение 1 часа, следующее сообщение, отправленное в теме kafka, не обрабатывается потребителем, однако цикл не останавливается.
Я бы хотел, чтобы мой слушатель работал 24/24 без потери соединения
consumer = KafkaConsumer(
os.environ.get('MY_TOPIC'),
bootstrap_servers=broker,
api_version=my_version,
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='GSSAPI',
sasl_kerberos_service_name=service_name,
group_id='MY_GRP_ID',
max_poll_records=1
)
try:
for msg in consumer:
##PROCESS function ...
consumer.commit()
finally:
consumer.close()
Комментарии:
1. У меня нет журнала или ошибки, цикл просто неопределенно ждет следующей итерации
Ответ №1:
Я, наконец, использую метод опроса :
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
while True:
# Response format is {TopicPartiton('topic1', 1): [msg1, msg2]}
msg_pack = consumer.poll(timeout_ms=500)
for tp, messages in msg_pack.items():
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
message.offset, message.key,
message.value))
Преимущество моей точки зрения на этот синтаксис заключается в лучшей видимости того, как извлекать сообщения. это не в моем примере, но я мог бы лучше управлять остановкой программы, запрашивая сигнал sigterm