#python #apache-kafka #consumer #confluent-kafka-python
Вопрос:
Я пытаюсь выполнить пакетное etl с помощью пакетов Python confluent_kafka по адресу 0 0 * * * каждый день. Я знаю, что в моем потоке есть 4 раздела, но их можно изменить, так есть ли способ проверить общее количество разделов в определенной теме? Моему потребителю это нравится;
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter = 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
Также я был бы признателен, если бы вы могли показать альтернативные способы реализации пакетного потребителя. Спасибо
Комментарии:
1. «но это может быть изменено» — затем включите аутентификацию и запретите изменения клиента администратора, тогда вам не понадобится это утверждение
Ответ №1:
list_topics()
Метод потребителя может предоставить карту Topics
состояний TopicMetadata
, которая в конечном итоге есть partitions
в нем.