#python #apache-kafka #async-await #fastapi
#python #apache-kafka #асинхронный -ожидание #fastapi
Вопрос:
Я хотел бы создать API для использования сообщения из темы Kafka с помощью FastAPI.
Я пытаюсь создать точку входа с помощью приведенного ниже кода:
import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException
from app.config import (
TOPIC_INGESTED_REQUEST,
KAFKA_BOOTSTRAP_SERVER,
)
logger = logging.getLogger(__name__)
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
# define consumer
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # Consumer must be in a group to commit
enable_auto_commit=True, # Is True by default anyway
auto_commit_interval_ms=1000, # Autocommit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
)
# start consumer
await consumer.start()
retrieved_requests = []
try:
data = await consumer.getmany()
for tp, messages in data.items():
for message in messages:
# Process message
retrieved_requests.append({
"key": messages.key.decode("utf-8"),
"value": json.loads(message.value.decode("utf-8")),
})
except Exception as e:
logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests
Я всегда заканчиваю пустым списком. Я не понимаю, почему я не могу получать сообщения из настроенной темы.
Я также пробую «классический» асинхронный цикл for в документе aiokafka. Но это тоже не работает. Однако в классическом цикле я вижу, что создается потребитель и некоторые сообщения передаются. С getmany
ним всегда пусто.
Спасибо!
Ответ №1:
Мне наконец удалось найти решение с getmany
помощью from aiokafka
consumer.
Идея состоит в том, чтобы попытаться получить сообщения с помощью consumer и установить тайм-аут, если нет (больше) сообщений. Я также установил максимальное количество сообщений для извлечения на случай, если сообщений слишком много, но это должно быть необязательным.
Вот код:
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
def kafka_json_deserializer(serialized):
return json.loads(serialized)
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
"""
Consume a list of 'Requests' from 'TOPIC_INGESTED_REQUEST'.
"""
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # unique identifier for each sidecar
enable_auto_commit=True,
auto_commit_interval_ms=1000, # commit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
value_deserializer=kafka_json_deserializer,
)
logger.info(
f"Start consumer with group ID: '{group_id}' on topic '{TOPIC_INGESTED_REQUEST}'."
)
await consumer.start()
logger.info("Consumer started.")
retrieved_requests = []
try:
result = await consumer.getmany(
timeout_ms=CONSUMER_TIMEOUT_MS, max_records=MAX_RECORDS_PER_CONSUMER
)
logger.info(f"Get {len(result)} messages in {TOPIC_INGESTED_REQUEST}.")
for tp, messages in result.items():
if messages:
for message in messages:
retrieved_requests.append(
{"key": message.key.decode("utf-8"), "value": message.value,}
)
# await consumer.commit({tp: messages[-1].offset 1})
except Exception as e:
logger.error(
f"Error when trying to consume request for {group_id} on topic {TOPIC_INGESTED_REQUEST}: {str(e)}"
)
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests