#multithreading #rabbitmq #pika
#многопоточность #rabbitmq #pika
Вопрос:
У меня следующий сценарий. У меня есть
- веб-сервер на основе python (который публикует) — (однопоточный и многопоточный)
- Запланированное задание (которое публикует) (однопоточное и многопоточное для выполнения разных заданий)
- Потребитель из очереди rabbitmq (подписывается на раздел rabbitmq) (однопоточный и многопоточный для использования разных сообщений)
В настоящее время я пытаюсь использовать rabbitmq для вышеупомянутого стека. Итак, для приведенного выше случая я хочу создать одно соединение rabbitmq для каждого процесса и использовать несколько каналов для поддержки многопоточных задач. В документации Rabbitmq говорится, что хорошо использовать несколько каналов для поддержки нескольких потоков. Но библиотека pika, похоже, не поддерживает этот сценарий. Вы можете сослаться на следующий пример, который я пробовал
import pika
import threading
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
def test_thread(a: int):
channel = connection.channel()
channel.exchange_declare(exchange='normal_ex', exchange_type='topic')
channel.basic_publish(exchange='normal_ex', routing_key='test', body=str(a))
for i in range(0, 10):
t = threading.Thread(target=test_thread, args=[i])
t.start()
time.sleep(10)
connection.close()
Когда я запускаю вышеуказанную программу, которая использует несколько каналов в нескольких потоках, я получаю следующие ошибки
Потоковое соединение потеряно: ошибка утверждения ((‘_AsyncTransportBase._produce() размер буфера tx ниже потока’, -21, 1))
Ответ №1:
из https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions
Является ли Pika потокобезопасным?
Pika не имеет никакого понятия о потоковой обработке в коде. Если вы хотите использовать Pika с потоковой обработкой, убедитесь, что у вас есть соединение Pika для каждого потока, созданное в этом потоке. Небезопасно совместно использовать одно соединение Pika между потоками, за одним исключением: вы можете вызвать метод подключения add_callback_threadsafe из другого потока, чтобы запланировать обратный вызов в активном соединении pika.