#python #websocket #rabbitmq #pika
Вопрос:
У нас есть сервер websocket, который выполняет следующие действия-
- Используйте регистр — Потоковые аудиоданные отправляются с веб-интерфейса на этот сервер через Websocket
- Сервер Websocket использует aio_pika для подключения к RabbitMQ из нашего клиента Python и публикации аудиоданных
- Подключения RabbitMQ открываются для каждого нового подключения к веб — сокету
WEBSOCKET_CODE
@app.websocket('/ws')
async def websocket(websocket: WebSocket, meeting_id: str, user_id: str):
global connections_ws, counter, audio
conn_id = meeting_id
print('before try')
try:
if not conn_id or conn_id == "-1":
await websocket.close()
print('before')
await connections_ws.connect(websocket, conn_id)
print('after')
# hard coded user-id for now
payload = {'user_id': user_id,
'meeting_id': meeting_id}
print('http://{}/speechx'.format(API_URL_TRANS_SERVER))
resp = requests.post('http://{}/speechx'.format(API_URL_TRANS_SERVER), json=payload)
print("amqp://user:user123@{}/".format(RABBITMQ_URL))
# Perform connection
connection = await connect(
"amqp://guest:guest@{}/".format(RABBITMQ_URL)
)
# Creating a channel
channel = await connection.channel()
exchange = await channel.declare_exchange("my_exchange", durable=True)
while (True):
receive_audio = await websocket.receive_bytes()
# Sending the message
await exchange.publish(
Message(receive_audio),
routing_key=meeting_id,
)
except Exception as err:
await channel.close()
await connection.close()
connections_ws.disconnect(websocket, conn_id)
print(f"Disconnect: {err}")
проблема
Проблема в том, что если у нас есть длительное соединение websocket или rabbitmq и 3-4 других недолговечных соединения от разных клиентов между ними, иногда сервер websocket не сможет принимать новые подключения к Rabbit MQ, когда мы ожидаем подключения.
ЖУРНАЛЫ WEBSOCKET_SERVER
ERROR: Exception in ASGI application Traceback (most recent call last):
File "ws_server_fastapi.py", line 73, in websocket
connection = await connect(
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 333, in connect
await connection.connect(
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 120, in connect
self.connection = await asyncio.wait_for(
File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
return await fut
File "/opt/conda/lib/python3.8/site-packages/aio_pika/connection.py", line 105, in _make_connection
connection = await aiormq.connect(self.url, **kwargs) File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 542, in connect
await connection.connect(client_properties or {})
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 168, in wrap
return await self.create_task(func(self, *args, **kwargs))
File "/opt/conda/lib/python3.8/site-packages/aiormq/base.py", line 25, in __inner
return await self.task
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 249, in connect
self.connection_tune = await self.__rpc(
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 200, in __rpc
_, _, frame = await self.__receive_frame()
File "/opt/conda/lib/python3.8/site-packages/aiormq/connection.py", line 327, in __receive_frame
frame_header = await self.reader.readexactly(1)
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 723, in readexactly
await self._wait_for_data('readexactly')
File "/opt/conda/lib/python3.8/asyncio/streams.py", line 517, in _wait_for_data
await self._waiter
File "/opt/conda/lib/python3.8/asyncio/selector_events.py", line 848, in _read_ready__data_received
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
RABBITMQ LOGS
2021-09-13 14:12:43.007265 00:00 [info] <0.18038.21> accepting AMQP connection <0.18038.21> (172.31.32.146:60714 -> 172.31.22.88:5672)
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> crasher:
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> initial call: aten_detector:init/1 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> pid: <0.18031.21>
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> registered_name: aten_detector
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> exception exit: {timeout,
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> {gen_server,call, 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> [aten_sink,get_failure_probabilities]}}
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in function gen_server:call/2 (gen_server.erl, line 239)
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from aten_detector:handle_info/2 (src/aten_detector.erl, line 103)
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from gen_server:try_dispatch/4 (gen_server.erl, line 695)
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> in call from gen_server:handle_msg/6 (gen_server.erl, line 771)
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> ancestors: [aten_sup,<0.179.0>] 2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> message_queue_len: 1
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> messages: [poll]
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> links: [<0.180.0>]
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> dictionary: []
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> trap_exit: false
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> status: running
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> heap_size: 6772
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> stack_size: 29
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> reductions: 11485
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21> neighbours:
2021-09-13 14:12:21.309372 00:00 [erro] <0.18031.21>
2021-09-13 14:12:48.145257 00:00 [erro] <0.18032.21> closing AMQP connection <0.18032.21> (172.31.17.69:46020 -> 172.31.22.88:5672):
2021-09-13 14:12:48.145257 00:00 [erro] <0.18032.21> {handshake_timeout,frame_header}
Can you pls help us in fixing the issue?