Использует сообщения Kafka с помощью FastAPI и aiokafka

#python #apache-kafka #async-await #fastapi

#python #apache-kafka #асинхронный -ожидание #fastapi

Вопрос:

Я хотел бы создать API для использования сообщения из темы Kafka с помощью FastAPI.

Я пытаюсь создать точку входа с помощью приведенного ниже кода:

 import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException

from app.config import (
    TOPIC_INGESTED_REQUEST,
    KAFKA_BOOTSTRAP_SERVER,
)

logger = logging.getLogger(__name__)

requests_router = r = APIRouter()

loop = asyncio.get_event_loop()

@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
    # define consumer
    consumer = AIOKafkaConsumer(
        TOPIC_INGESTED_REQUEST,
        loop=loop,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
        group_id=group_id,  # Consumer must be in a group to commit
        enable_auto_commit=True,  # Is True by default anyway
        auto_commit_interval_ms=1000,  # Autocommit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
    )

    # start consumer
    await consumer.start()

    retrieved_requests = []
    try:
        data = await consumer.getmany()
        for tp, messages in data.items():
            for message in messages:
                # Process message
                retrieved_requests.append({
                        "key": messages.key.decode("utf-8"),
                        "value": json.loads(message.value.decode("utf-8")),
                    })
    except Exception as e:
        logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        await consumer.stop()
    return retrieved_requests
  

Я всегда заканчиваю пустым списком. Я не понимаю, почему я не могу получать сообщения из настроенной темы.

Я также пробую «классический» асинхронный цикл for в документе aiokafka. Но это тоже не работает. Однако в классическом цикле я вижу, что создается потребитель и некоторые сообщения передаются. С getmany ним всегда пусто.

Спасибо!

Ответ №1:

Мне наконец удалось найти решение с getmany помощью from aiokafka consumer.

Идея состоит в том, чтобы попытаться получить сообщения с помощью consumer и установить тайм-аут, если нет (больше) сообщений. Я также установил максимальное количество сообщений для извлечения на случай, если сообщений слишком много, но это должно быть необязательным.

Вот код:

 requests_router = r = APIRouter()

loop = asyncio.get_event_loop()


def kafka_json_deserializer(serialized):
    return json.loads(serialized)


@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
    """
    Consume a list of 'Requests' from 'TOPIC_INGESTED_REQUEST'.
    """
    consumer = AIOKafkaConsumer(
        TOPIC_INGESTED_REQUEST,
        loop=loop,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
        group_id=group_id,  # unique identifier for each sidecar
        enable_auto_commit=True,
        auto_commit_interval_ms=1000,  # commit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
        value_deserializer=kafka_json_deserializer,
    )

    logger.info(
        f"Start consumer with group ID: '{group_id}' on topic '{TOPIC_INGESTED_REQUEST}'."
    )
    await consumer.start()
    logger.info("Consumer started.")

    retrieved_requests = []
    try:
        result = await consumer.getmany(
            timeout_ms=CONSUMER_TIMEOUT_MS, max_records=MAX_RECORDS_PER_CONSUMER
        )
        logger.info(f"Get {len(result)} messages in {TOPIC_INGESTED_REQUEST}.")
        for tp, messages in result.items():
            if messages:
                for message in messages:
                    retrieved_requests.append(
                        {"key": message.key.decode("utf-8"), "value": message.value,}
                    )
                # await consumer.commit({tp: messages[-1].offset   1})
    except Exception as e:
        logger.error(
            f"Error when trying to consume request for {group_id} on topic {TOPIC_INGESTED_REQUEST}: {str(e)}"
        )
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        await consumer.stop()

    return retrieved_requests