#python #apache-kafka #kafka-consumer-api
#python #apache-kafka #kafka-consumer-api
Вопрос:
Я установил Kafka на свой ПК с Windows. Создал тему quickstart-events
и отправил несколько сообщений. Запуск консольного потребителя с параметром --from-beginning
может получать сообщения.
.binwindowskafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:WINDOWSsystem32AternityJavaJavaHookLoader.dll"="C:ProgramDataAternityhooks"
msg1
msg2
msg3
msg4
Однако при запуске кода Python с параметром auto_offset_reset='earliest'
сообщение будет напечатано в первый раз. Значит, он не печатает никаких сообщений после первого запуска?
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest')
for msg in consumer:
print(msg)
Комментарии:
1. Я пробовал это с
earliest
помощью, но он по-прежнему не печатает никаких сообщений. Потребитель консоли все еще печатает.2. Я перезапустил свой компьютер, и теперь код работает.
3. Да, запускаю его снова, и теперь ничего не печатается. Я не указывал никакой группы потребителей. Есть ли какой-нибудь способ заставить его печатать сообщение каждый раз?
Ответ №1:
TL; DR
вам необходимо предоставить новый group.id каждый раз, когда вы хотите прочитать тему с начала, сохраняя параметр auto_offset_reset=’самый ранний’:
KafkaConsumer('quickstart-events', bootstrap_servers=['localhost:9092'], auto_offset_reset='smallest', group_id='newGroup')
Если ваш код печатает выходные данные при первом запуске, но больше не при последующих запусках, и ваша проблема также решается при перезапуске Kafka (вашего ПК), вы сталкиваетесь с концепцией группы потребителей в Kafka. Поскольку это довольно важная концепция, я настоятельно рекомендую ознакомиться с ней здесь.
Группа потребителей приложения гарантирует, что оно не прочитает сообщение дважды. У каждого потребителя есть имя группы потребителей (даже если вы можете не видеть его непосредственно в своем коде). Позиция смещения группы потребителей хранится во внутреннем разделе Kafka.
Теперь, выполняя код в первый раз после перезапуска Kafka, Kafka еще не знает группу потребителей и применяет политику, указанную в конфигурации auto_offset_reset . В вашем случае он считывается с самого раннего доступного коммита. Во второй раз, когда вы запускаете свой код, ему не нужно изучать эту политику, потому что он уже знает потребителя и не позволит потребителю снова использовать сообщение.
Поэтому, если вы перезапускаете Kafka, это внутреннее знание потребителя также исчезает, и снова применяется политика auto_offset_reset.
Просто имейте в виду, что это скорее хак, и его не следует часто выполнять в производственных системах, поскольку группы потребителей будут простаивать.
В качестве примечания к sid: пользователь console-consumer создает новую группу потребителей каждый раз, когда вы ее запускаете. Параметр «—from-beginning» просто гарантирует, что для auto_offset_reset установлено значение «самый ранний».