aio_pika. Сообщения были случайно потеряны из очереди

#rabbitmq #pika #aio

#rabbitmq #pika #aio

Вопрос:

У меня есть генератор асинхронного запуска для использования сообщений.

 import asyncio
queue = asyncio.Queue()


async def consume_gen(
            self,
            consume_from,
            prefetch_count,
            priority=None
    ):
        async with self.channel_pool.acquire() as channel:
            await channel.set_qos(prefetch_count=5)
                self.amqp_queue = await channel.declare_queue(
                    'queue_name_for_consuming',
                     durable=True, 
                     auto_delete=False
                )
                await self.amqp_queue.consume(
                    self.get_message, no_ack=False
                )
                await asyncio.sleep(0)
    
            while True:
                try:
                    message = self.queue.get_nowait()
                    yield message
                except asyncio.queues.QueueEmpty:
                    await asyncio.sleep(1)
                    yield None
                except GeneratorExit:
                    return
            else:
                return
 

Это функция обратного вызова для моего генератора, которая отвечает за получение сообщения для очереди и помещает его во внутреннюю очередь asyncio.

 async def get_message(self, message):
    await self.queue.put(message)
 

Моя проблема в том, что некоторые сообщения случайным образом исчезают из очереди. Они не подтверждаются или не застряли ни у одного потребителя, потому что у меня есть журналы для всех шагов. Кроме того, я знаю, что эти сообщения приходят в очередь, из которой они должны быть использованы. Буду благодарен за любую помощь в решении моей проблемы

Ответ №1:

Разрешите это. Проблемы были проще. Моему приложению-регистратору недостаточно уровня журнала)