#python #apache-kafka #python-asyncio #kafka-consumer-api
Вопрос:
Если приложение публикует объект/словарь JSON payload
в теме Кафки test
с данными , которые либо содержат "type": "snapshot"
, либо "type": "changes"
, как мы настраиваем AIOKafkaConsumer
, чтобы начать использование с самого последнего сообщения "type": snapshot
?
т. е. из сообщения: {"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}
Предположим, что потребуется слишком много времени, чтобы начать потребление со смещения 0.
Спасибо!
import asyncio
import json
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
payload = {"type": "changes", "scores": {"foo": 5}}
await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
payload = {"type": "snapshot", "scores": {"foo": 10, "bar": 20}}
await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
payload = {"type": "changes", "scores": {"baz": 15}}
await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
payload = {"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}
await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
payload = {"type": "changes", "scores": {"baz": 45}}
await producer.send_and_wait(topic, bytes(json.dumps(payload), "utf-8"))
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
while True:
message = await consumer.getone()
print("message:", message.value)
# HOW TO: Start consuming from `{"type": "snapshot", "scores": {"foo": 20, "bar": 30, "baz": 30}}`
if __name__ == "__main__":
asyncio.run(main())