Как получить последнее смещение от каждого раздела, используя kafka-python?

#python #apache-kafka

#python #apache-kafka

Вопрос:

Я пытаюсь получить последнее смещение (не зафиксированное смещение) из каждого раздела для данной темы.

 from kafka import KafkaConsumer, TopicPartition

topic = 'test-topic'
broker = 'localhost:9092'

consumer = KafkaConsumer(bootstrap_servers=broker)

tp = TopicPartition(topic, 0)        #1
consumer.assign([tp])                #2
consumer.seek_to_end(tp)             #3
last_offset = consumer.position(tp)  #4

for i in consumer.partitions_for_topic(topic):
    tp = TopicPartition(topic, i)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print(last_offset)
  

Предыдущий код работает и выводит смещение каждого раздела. Однако обратите внимание, что у меня есть те же 4 строки вне цикла, что и внутри цикла. Если я удалю любую из строк # 1 — # 4 (4 строки, непосредственно предшествующие циклу for) Я получаю сообщение об ошибке:
Файл «check_kafka_offset.py «, строка 19, в для i в consumer.partitions_for_topic(тема): ошибка типа: объект ‘NoneType’ не может быть повторен

Почему мне нужно иметь 4 строки перед циклом for?

Ответ №1:

Вы можете использовать end_offsets(partitions) функцию в этом клиенте, чтобы получить последнее смещение для указанных разделов. Обратите внимание, что возвращаемое смещение является следующим смещением, то есть текущим концом 1. Документация здесь.

Редактировать: Пример реализации:

 from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"

consumer = KafkaConsumer(
    group_id="my-group",
    bootstrap_servers=[BOOTSTRAP],
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=API_KEY,
    sasl_plain_password=API_SECRET,
    value_deserializer=lambda m: json.loads(m.decode('ascii')),
    auto_offset_reset='earliest'
)

PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
    PARTITIONS.append(TopicPartition(TOPIC, partition))
    
end_offsets = consumer.end_offsets(PARTITIONS)
print(end_offsets)
  

и end_offsets выглядит так:

 {TopicPartition(topic=u'python-test', partition=0): 5,
 TopicPartition(topic=u'python-test', partition=1): 20,
 TopicPartition(topic=u'python-test', partition=2): 0}
  

Комментарии:

1. Не могли бы вы немного подробнее рассказать об этом? end_offsets (partitions) принимает список разделов, но тогда я предполагаю, что мне нужно будет создать экземпляр потребителя с определенной темой? Это также не объясняет ошибку, которую я получаю, если я удаляю любую из строк, обозначенных # 1 — # 4

2. добавлен пример реализации

3. @stewvsshark Мне не нужно было создавать экземпляр моего потребителя с темой, этот код вызывается после создания стандартного потребителя. Если это правильный ответ, пожалуйста, отметьте его галочкой.

4. Я попробовал этот код точно так, как написано выше (с моим исходным кодом, создающим экземпляр потребителя), и я все еще получаю «TypeError: объект ‘NoneType’ не повторяется» в строке, определяющей цикл for. Вероятно, мне следовало упомянуть, что я использую python 2.7, а не python 3 — это проблема, которая может вызывать эту ошибку?

5. @stewvsshark Я только что обновил полный скрипт, который я только что протестировал (с включенным ведением журнала отладки) на Python 2.7. Попробуйте его с журналом отладки.

Ответ №2:

Вот простая и хорошо документированная функция:

 from kafka import TopicPartition
def getTopicInfos(consumer, topic: str):
    """
    Get topic's informations like partitions with their last offsets.
    Example of result: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})

    - Parameters:
      consumer: A Kafka consumer.
      topic: A topic name.

    - Return:
      The topic's informations.
    """
    # Get topic-partition pairs
    # E.g: [TopicPartition(topic='myTopic', partition=0), TopicPartition(topic='myTopic', partition=1)]
    tp = [TopicPartition(topic, partition) for partition in consumer.partitions_for_topic(topic)]

    # Get last offsets
    # E.g: {TopicPartition(topic='myTopic', partition=0): 47, TopicPartition(topic='myTopic', partition=1): 98}
    tplo = consumer.end_offsets(tp)

    # Format partition-lastOffset pairs
    # E.g: ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']
    plo = ['{'   f'"partition": {item.partition}, "lastOffset": {tplo.get(item)}'   '}' for item in tplo]

    # Concat topic with partition-lastOffset pairs
    # E.g: {'topic': 'myTopic', 'partitions': ['{"partition": 0, "lastOffset": 47}', '{"partition": 1, "lastOffset": 98}']})
    tplo = {"topic": topic, "partitions": plo}

    # Return the result
    return tplo