#apache-kafka #confluent-kafka-python
#python #apache-kafka #confluent-kafka-python
Вопрос:
В настоящее время я использую клиент Confluent kafka python для использования сообщений из темы kafka, и код отлично выполняется внутри while True
цикла, как показано в примерах в документации. Однако я хотел бы настроить задание cron, которое использует раздел только один раз в день. Идея заключается в том, что задание проверит тему утром, обработает все сообщения в теме в этот момент времени и затем остановится. Я попытался достичь этого в python следующим образом:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
Проблема в том, что это никогда не заканчивается потреблением чего-либо. Я думаю, что первая строка никогда не получает сообщение с первой попытки. Это работает только с while True
но я не хочу, чтобы этот код выполнялся бесконечно, только до тех пор, пока не будет использовано последнее сообщение на данный момент времени.
Ответ №1:
Вы могли бы проверить смещения группы потребителей внутри цикла, а затем прервать цикл, как только вы окажетесь в пределах некоторого порога «end»
Возможно, вы также захотите поиграть с max.poll.records
конфигурацией потребителя, чтобы обеспечить больший контроль над тем, сколько записей вы получаете обратно
Комментарии:
1. У меня был consumer.poll (1), и увеличение времени ожидания с 1 до 10 сделало свое дело, спасибо!
2. Как вы думаете, вы могли бы поделиться кодом? Я не умею использовать kafka-python, но мне также нужно читать сообщения и выходить из цикла.