Потребитель Python Kafka не получает сообщение с самого начала?

#python #apache-kafka #kafka-consumer-api

#python #apache-kafka #kafka-consumer-api

Вопрос:

Я установил Kafka на свой ПК с Windows. Создал тему quickstart-events и отправил несколько сообщений. Запуск консольного потребителя с параметром --from-beginning может получать сообщения.

 .binwindowskafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:WINDOWSsystem32AternityJavaJavaHookLoader.dll"="C:ProgramDataAternityhooks"
msg1
msg2
msg3
msg4
  

Однако при запуске кода Python с параметром auto_offset_reset='earliest' сообщение будет напечатано в первый раз. Значит, он не печатает никаких сообщений после первого запуска?

 from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest')
for msg in consumer:
    print(msg)
  

Комментарии:

1. Я пробовал это с earliest помощью, но он по-прежнему не печатает никаких сообщений. Потребитель консоли все еще печатает.

2. Я перезапустил свой компьютер, и теперь код работает.

3. Да, запускаю его снова, и теперь ничего не печатается. Я не указывал никакой группы потребителей. Есть ли какой-нибудь способ заставить его печатать сообщение каждый раз?

Ответ №1:

TL; DR

вам необходимо предоставить новый group.id каждый раз, когда вы хотите прочитать тему с начала, сохраняя параметр auto_offset_reset=’самый ранний’:

 KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest', group_id='newGroup')
  

Если ваш код печатает выходные данные при первом запуске, но больше не при последующих запусках, и ваша проблема также решается при перезапуске Kafka (вашего ПК), вы сталкиваетесь с концепцией группы потребителей в Kafka. Поскольку это довольно важная концепция, я настоятельно рекомендую ознакомиться с ней здесь.

Группа потребителей приложения гарантирует, что оно не прочитает сообщение дважды. У каждого потребителя есть имя группы потребителей (даже если вы можете не видеть его непосредственно в своем коде). Позиция смещения группы потребителей хранится во внутреннем разделе Kafka.

Теперь, выполняя код в первый раз после перезапуска Kafka, Kafka еще не знает группу потребителей и применяет политику, указанную в конфигурации auto_offset_reset . В вашем случае он считывается с самого раннего доступного коммита. Во второй раз, когда вы запускаете свой код, ему не нужно изучать эту политику, потому что он уже знает потребителя и не позволит потребителю снова использовать сообщение.

Поэтому, если вы перезапускаете Kafka, это внутреннее знание потребителя также исчезает, и снова применяется политика auto_offset_reset.

Просто имейте в виду, что это скорее хак, и его не следует часто выполнять в производственных системах, поскольку группы потребителей будут простаивать.

В качестве примечания к sid: пользователь console-consumer создает новую группу потребителей каждый раз, когда вы ее запускаете. Параметр «—from-beginning» просто гарантирует, что для auto_offset_reset установлено значение «самый ранний».