#python #python-3.x #rabbitmq #rabbitmq-exchange
Вопрос:
Я использую RabbitMQ в качестве очереди различных сообщений. Когда я получаю эти сообщения от двух разных потребителей из одной очереди, я обрабатываю их и вставляю результаты обработки в базу данных:
def consumer_callback(self, channel, delivery_tag, properties, message): result = make_some_processing(message) insert_to_db(result) channel.basic_ack(delivery_tag)
Я хочу массово потреблять сообщения из очереди, что уменьшит нагрузку на БД. Поскольку RabbitMQ не поддерживает массовое чтение сообщений потребителями, я собираюсь сделать что-то вроде этого:
some_messages_list = [] def consumer_callback(self, channel, delivery_tag, properties, message): some_messages_list.append({delivery_tag: message}) if len(some_messages_list) gt; 1000: results_list = make_some_processing_bulk(some_messages_list) insert_to_db_bulk(results_list) for tag in some_messages_list: channel.basic_ack(tag) some_messages_list.clear()
- Сообщения находятся в очереди до того, как все они будут полностью обработаны
- Если потребитель падает или отключается — сообщения остаются в безопасности
Что вы думаете об этом решении? Если все в порядке, как я могу получить все незарегистрированные сообщения заново, если потребитель упадет?
Комментарии:
1. В RabbitMQ может быть функция блокировки подглядывания, при которой потребитель может заблокировать сообщение и подтвердить его завершение, но разблокировать сообщение, и оно может быть прочитано другими, если что-то пойдет не так
2. Похоже, у него нет этой функции