#python #apache-kafka
#python #apache-kafka
Вопрос:
Я пытаюсь получить последнее смещение (не зафиксированное смещение) из каждого раздела для данной темы.
from kafka import KafkaConsumer, TopicPartition
topic = 'test-topic'
broker = 'localhost:9092'
consumer = KafkaConsumer(bootstrap_servers=broker)
tp = TopicPartition(topic, 0) #1
consumer.assign([tp]) #2
consumer.seek_to_end(tp) #3
last_offset = consumer.position(tp) #4
for i in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, i)
consumer.assign([tp])
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print(last_offset)
Предыдущий код работает и выводит смещение каждого раздела. Однако обратите внимание, что у меня есть те же 4 строки вне цикла, что и внутри цикла. Если я удалю любую из строк # 1 — # 4 (4 строки, непосредственно предшествующие циклу for) Я получаю сообщение об ошибке:
Файл «check_kafka_offset.py «, строка 19, в для i в consumer.partitions_for_topic(тема): ошибка типа: объект ‘NoneType’ не может быть повторен
Почему мне нужно иметь 4 строки перед циклом for?
Ответ №1:
Вы можете использовать end_offsets(partitions)
функцию в этом клиенте, чтобы получить последнее смещение для указанных разделов. Обратите внимание, что возвращаемое смещение является следующим смещением, то есть текущим концом 1. Документация здесь.
Редактировать: Пример реализации:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"
consumer = KafkaConsumer(
group_id="my-group",
bootstrap_servers=[BOOTSTRAP],
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=API_KEY,
sasl_plain_password=API_SECRET,
value_deserializer=lambda m: json.loads(m.decode('ascii')),
auto_offset_reset='earliest'
)
PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
PARTITIONS.append(TopicPartition(TOPIC, partition))
end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)
и end_offsets
выглядит так:
{TopicPartition(topic=u'python-test', partition=0): 5,
TopicPartition(topic=u'python-test', partition=1): 20,
TopicPartition(topic=u'python-test', partition=2): 0}
Комментарии:
1. Не могли бы вы немного подробнее рассказать об этом? end_offsets (partitions) принимает список разделов, но тогда я предполагаю, что мне нужно будет создать экземпляр потребителя с определенной темой? Это также не объясняет ошибку, которую я получаю, если я удаляю любую из строк, обозначенных # 1 — # 4
2. добавлен пример реализации
3. @stewvsshark Мне не нужно было создавать экземпляр моего потребителя с темой, этот код вызывается после создания стандартного потребителя. Если это правильный ответ, пожалуйста, отметьте его галочкой.
4. Я попробовал этот код точно так, как написано выше (с моим исходным кодом, создающим экземпляр потребителя), и я все еще получаю «TypeError: объект ‘NoneType’ не повторяется» в строке, определяющей цикл for. Вероятно, мне следовало упомянуть, что я использую python 2.7, а не python 3 — это проблема, которая может вызывать эту ошибку?
5. @stewvsshark Я только что обновил полный скрипт, который я только что протестировал (с включенным ведением журнала отладки) на Python 2.7. Попробуйте его с журналом отладки.
Ответ №2:
Вот простая и хорошо документированная функция:
from kafka import TopicPartition
def getTopicInfos(consumer, topic: str):
"""
Get topic's informations like partitions with their last offsets.
Example of result: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
- Parameters:
consumer: A Kafka consumer.
topic: A topic name.
- Return:
The topic's informations.
"""
# Get topic-partition pairs
# E.g: [TopicPartition(topic='myTopic', partition=0), TopicPartition(topic='myTopic', partition=1)]
tp = [TopicPartition(topic, partition) for partition in consumer.partitions_for_topic(topic)]
# Get last offsets
# E.g: {TopicPartition(topic='myTopic', partition=0): 47, TopicPartition(topic='myTopic', partition=1): 98}
tplo = consumer.end_offsets(tp)
# Format partition-lastOffset pairs
# E.g: ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']
plo = ['{' f'"partition": {item.partition}, "lastOffset": {tplo.get(item)}' '}' for item in tplo]
# Concat topic with partition-lastOffset pairs
# E.g: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
tplo = {"topic": topic, "partitions": plo}
# Return the result
return tplo