Ошибка подключения RabbitMQ(aio-pika): [Ошибка 104] Сброс соединения одноранговым узлом

#python #websocket #rabbitmq #pika

Вопрос:

У нас есть сервер websocket, который выполняет следующие действия-

  • Используйте регистр — Потоковые аудиоданные отправляются с веб-интерфейса на этот сервер через Websocket
  • Сервер Websocket использует aio_pika для подключения к RabbitMQ из нашего клиента Python и публикации аудиоданных
  • Подключения RabbitMQ открываются для каждого нового подключения к веб — сокету

WEBSOCKET_CODE

 @app.websocket('/ws')
async def websocket(websocket: WebSocket, meeting_id: str, user_id: str):
    global connections_ws, counter, audio
    conn_id = meeting_id
    print('before try')
    try:
        if not conn_id or conn_id == "-1":
            await websocket.close()
        print('before')
        await connections_ws.connect(websocket, conn_id)
        print('after')
        # hard coded user-id for now
        payload = {'user_id': user_id,
                   'meeting_id': meeting_id}

        print('http://{}/speechx'.format(API_URL_TRANS_SERVER))
        resp = requests.post('http://{}/speechx'.format(API_URL_TRANS_SERVER), json=payload)

        print("amqp://user:user123@{}/".format(RABBITMQ_URL))
        # Perform connection
        connection = await connect(
            "amqp://guest:guest@{}/".format(RABBITMQ_URL)
        )
        # Creating a channel
        channel = await connection.channel()
        exchange = await channel.declare_exchange("my_exchange", durable=True)

        while (True):
            receive_audio = await websocket.receive_bytes()

            # Sending the message
            await exchange.publish(
                Message(receive_audio),
                routing_key=meeting_id,
            )

    except Exception as err:
        await channel.close()
        await connection.close()
        connections_ws.disconnect(websocket, conn_id)
        print(f"Disconnect: {err}")
 

проблема

Проблема в том, что если у нас есть длительное соединение websocket или rabbitmq и 3-4 других недолговечных соединения от разных клиентов между ними, иногда сервер websocket не сможет принимать новые подключения к Rabbit MQ, когда мы ожидаем подключения.

ЖУРНАЛЫ WEBSOCKET_SERVER

 ERROR:  Exception in ASGI application Traceback (most recent call last):  
File "ws_server_fastapi.py", line 73, in websocket   
connection = await connect(  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 333, in connect   
await connection.connect(  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 120, in connect   
self.connection = await asyncio.wait_for(  
File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 455, in wait_for   
return await fut  
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 105, in _make_connection   
connection = await aiormq.connect(self.url, **kwargs) File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 542, in connect   
await connection.connect(client_properties or {})  
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap   
return await self.create_task(func(self, *args, **kwargs))  
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner   
return await self.task  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 249, in connect   
self.connection_tune = await self.__rpc(  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 200, in __rpc   
_, _, frame = await self.__receive_frame()  
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame   
frame_header = await self.reader.readexactly(1)  
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 723, in readexactly   
await self._wait_for_data('readexactly')  
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data   
await self._waiter  
File "/opt/conda/lib/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received   
data = self._sock.recv(self.max_size) 
ConnectionResetError: [Errno 104] Connection reset by peer
 

RABBITMQ LOGS

 2021-09-13 14:12:43.007265 00:00 [info] <0.18038.21> accepting AMQP connection <0.18038.21> (172.31.32.146:60714 -> 172.31.22.88:5672) 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> crasher: 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> initial call: aten_detector:init/1 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> pid: <0.18031.21> 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> registered_name: aten_detector 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> exception exit: {timeout, 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> {gen_server,call, 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> [aten_sink,get_failure_probabilities]}} 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in function gen_server:call/2 (gen_server.erl, line 239) 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from aten_detector:handle_info/2 (src/aten_detector.erl, line 103) 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from gen_server:try_dispatch/4 (gen_server.erl, line 695) 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from gen_server:handle_msg/6 (gen_server.erl, line 771) 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> ancestors: [aten_sup,<0.179.0>] 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> message_queue_len: 1 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> messages: [poll] 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> links: [<0.180.0>] 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> dictionary: [] 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> trap_exit: false 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> status: running 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> heap_size: 6772 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> stack_size: 29 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> reductions: 11485 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> neighbours: 
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> 
2021-09-13 14:12:48.145257 00:00 [erro] <0.18032.21> closing AMQP connection <0.18032.21> (172.31.17.69:46020 -> 172.31.22.88:5672): 
2021-09-13 14:12:48.145257 00:00 [erro] <0.18032.21> {handshake_timeout,frame_header}
 

Can you pls help us in fixing the issue?