Использовать из kafka без бесконечного цикла

#apache-kafka #confluent-kafka-python

#python #apache-kafka #confluent-kafka-python

Вопрос:

В настоящее время я использую клиент Confluent kafka python для использования сообщений из темы kafka, и код отлично выполняется внутри while True цикла, как показано в примерах в документации. Однако я хотел бы настроить задание cron, которое использует раздел только один раз в день. Идея заключается в том, что задание проверит тему утром, обработает все сообщения в теме в этот момент времени и затем остановится. Я попытался достичь этого в python следующим образом:

 msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()
  

Проблема в том, что это никогда не заканчивается потреблением чего-либо. Я думаю, что первая строка никогда не получает сообщение с первой попытки. Это работает только с while True но я не хочу, чтобы этот код выполнялся бесконечно, только до тех пор, пока не будет использовано последнее сообщение на данный момент времени.

Ответ №1:

Вы могли бы проверить смещения группы потребителей внутри цикла, а затем прервать цикл, как только вы окажетесь в пределах некоторого порога «end»

Возможно, вы также захотите поиграть с max.poll.records конфигурацией потребителя, чтобы обеспечить больший контроль над тем, сколько записей вы получаете обратно

Комментарии:

1. У меня был consumer.poll (1), и увеличение времени ожидания с 1 до 10 сделало свое дело, спасибо!

2. Как вы думаете, вы могли бы поделиться кодом? Я не умею использовать kafka-python, но мне также нужно читать сообщения и выходить из цикла.