#rabbitmq #python-3.8 #pika
Вопрос:
Я создаю автономное приложение, которое содержит API и брокерский Rabbitmq. Мое приложение должно отвечать в конечной точке. Мой rabbitmq в тесте обрабатывает мои данные и возвращает их в обычном режиме.
Итак, в своем API я создал библиотеку и поместил свой код производителя в этот путь. Я создаю класс и вызываю его, но отображается ошибка:
Этот же код в тесте должен выполняться нормально.
Здесь будет мой код продукта:
import base64 import os from decouple import config import pika class Producer: def __init__(self, corr_id, img): self.corr_id = id self.img = img def producer(self): BROKER = config('BROKER_URL') BROKER_PORT = config('BROKER_PORT') USER = config('USER') PASSWORD = config('PASSWORD') VHOST = config('VHOST') uri = f'amqp://{USER}:{PASSWORD}@{BROKER}:{BROKER_PORT}/{VHOST}' connection = pika.BlockingConnection(uri) channel = connection.channel() queue_declared = channel.queue_declare('', exclusive=True) callback_queue = queue_declared.method.queue channel.exchange_declare(exchange='image_hash', exchange_type='headers') def on_response(ch, method, props, body): if corr_id == props.correlation_id: print(body) response = str(body) channel.basic_consume( queue = callback_queue, on_message_callback = on_response, auto_ack = True ) corr_id = str(self.corr_id) published_message = channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( correlation_id = corr_id, delivery_mode=2, reply_to=callback_queue ), body = self.img ) channel.start_consuming() return published_message
Код теста, который обычно отправляет и получает информацию, является:
import base64 from decouple import config import pika import uuid BROKER = config('BROKER_URL') BROKER_PORT = config('BROKER_PORT') USER = config('USERRABBITMQ') PASSWORD = config('PASSRABBITMQ') VHOST = config('VHOST') credentials = pika.PlainCredentials(USER, PASSWORD) parameters = pika.ConnectionParameters(BROKER, BROKER_PORT, VHOST, credentials ) def test_consumer(data_img): connection = pika.BlockingConnection(parameters) channel = connection.channel() queue_declared = channel.queue_declare('', exclusive=True) callback_queue = queue_declared.method.queue channel.exchange_declare(exchange='image_hash', exchange_type='headers') def on_response(ch, method, props, body): if corr_id == props.correlation_id: print(body) response = str(body) channel.basic_consume( queue = callback_queue, on_message_callback = on_response, auto_ack = True ) corr_id = str(uuid.uuid4()) published_message = channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( correlation_id = corr_id, delivery_mode=2, reply_to=callback_queue ), body = data_img ) channel.start_consuming() return published_message file = 'C:\Users\ER20259240\workspace\image_hash\services\consumers\tests\img\front.jpg' with open(file, 'rb') as img_file: data_img = base64.b64encode(img_file.read()) response = test_consumer(data_img)
I don’t know why in test code to be ok and in my class the error happen!
My api to use onle a route /image and receive um id and image. Was build with fastapi.
My route:
@app.post('/images') async def images(files: List[UploadFile] = File(...), id:str = Form(...)): # Aqui deve ser feita a chamada da classe para responder a requisição de hash. for file in files: corr_id = id name = file.filename producer = Producer(corr_id, file) hash = producer.producer() return hash
Почему произошла ошибка и как я могу ее решить?