Rabbitmq pika о многопроцессорной и многопоточной архитектуре

#multithreading #rabbitmq #pika

#многопоточность #rabbitmq #pika

Вопрос:

У меня следующий сценарий. У меня есть

  1. веб-сервер на основе python (который публикует) — (однопоточный и многопоточный)
  2. Запланированное задание (которое публикует) (однопоточное и многопоточное для выполнения разных заданий)
  3. Потребитель из очереди rabbitmq (подписывается на раздел rabbitmq) (однопоточный и многопоточный для использования разных сообщений)

В настоящее время я пытаюсь использовать rabbitmq для вышеупомянутого стека. Итак, для приведенного выше случая я хочу создать одно соединение rabbitmq для каждого процесса и использовать несколько каналов для поддержки многопоточных задач. В документации Rabbitmq говорится, что хорошо использовать несколько каналов для поддержки нескольких потоков. Но библиотека pika, похоже, не поддерживает этот сценарий. Вы можете сослаться на следующий пример, который я пробовал

 import pika
import threading
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))


def test_thread(a: int):
    channel = connection.channel()
    channel.exchange_declare(exchange='normal_ex', exchange_type='topic')
    channel.basic_publish(exchange='normal_ex', routing_key='test', body=str(a))


for i in range(0, 10):
    t = threading.Thread(target=test_thread, args=[i])
    t.start()


time.sleep(10)
connection.close()
 

Когда я запускаю вышеуказанную программу, которая использует несколько каналов в нескольких потоках, я получаю следующие ошибки

Потоковое соединение потеряно: ошибка утверждения ((‘_AsyncTransportBase._produce() размер буфера tx ниже потока’, -21, 1))

Ответ №1:

из https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions

Является ли Pika потокобезопасным?

Pika не имеет никакого понятия о потоковой обработке в коде. Если вы хотите использовать Pika с потоковой обработкой, убедитесь, что у вас есть соединение Pika для каждого потока, созданное в этом потоке. Небезопасно совместно использовать одно соединение Pika между потоками, за одним исключением: вы можете вызвать метод подключения add_callback_threadsafe из другого потока, чтобы запланировать обратный вызов в активном соединении pika.