#rabbitmq #pika #aio
#rabbitmq #pika #aio
Вопрос:
У меня есть генератор асинхронного запуска для использования сообщений.
import asyncio
queue = asyncio.Queue()
async def consume_gen(
self,
consume_from,
prefetch_count,
priority=None
):
async with self.channel_pool.acquire() as channel:
await channel.set_qos(prefetch_count=5)
self.amqp_queue = await channel.declare_queue(
'queue_name_for_consuming',
durable=True,
auto_delete=False
)
await self.amqp_queue.consume(
self.get_message, no_ack=False
)
await asyncio.sleep(0)
while True:
try:
message = self.queue.get_nowait()
yield message
except asyncio.queues.QueueEmpty:
await asyncio.sleep(1)
yield None
except GeneratorExit:
return
else:
return
Это функция обратного вызова для моего генератора, которая отвечает за получение сообщения для очереди и помещает его во внутреннюю очередь asyncio.
async def get_message(self, message):
await self.queue.put(message)
Моя проблема в том, что некоторые сообщения случайным образом исчезают из очереди. Они не подтверждаются или не застряли ни у одного потребителя, потому что у меня есть журналы для всех шагов. Кроме того, я знаю, что эти сообщения приходят в очередь, из которой они должны быть использованы. Буду благодарен за любую помощь в решении моей проблемы
Ответ №1:
Разрешите это. Проблемы были проще. Моему приложению-регистратору недостаточно уровня журнала)