Как разрешить пользователю AIOKafkaConsumer использовать тему Кафки, начиная с самого последнего сообщения с определенным значением

#python #apache-kafka #python-asyncio #kafka-consumer-api

Вопрос:

Если приложение публикует объект/словарь JSON payload в теме Кафки test с данными , которые либо содержат "type": "snapshot" , либо "type": "changes" , как мы настраиваем AIOKafkaConsumer , чтобы начать использование с самого последнего сообщения "type": snapshot ?

т. е. из сообщения: {"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}

Предположим, что потребуется слишком много времени, чтобы начать потребление со смещения 0.

Спасибо!

 import asyncio
import json

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"

    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()

    payload = {"type": "changes", "scores": {"foo": 5}}
    await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
    payload = {"type": "snapshot", "scores": {"foo": 10, "bar": 20}}
    await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
    payload = {"type": "changes", "scores": {"baz": 15}}
    await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
    payload = {"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}
    await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
    payload = {"type": "changes", "scores": {"baz": 45}}
    await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))

    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    while True:
        message = await consumer.getone()
        print("message:", message.value)

        # HOW TO: Start consuming from `{"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}`

if __name__ == "__main__":
    asyncio.run(main())