Потоковая передача данных с нескольких клиентов на сервер с помощью aiortc

#python-3.x #websocket #multiprocessing #webrtc #aiortc

Вопрос:

Пытаясь создать много-много потоковой архитектуры, мы намерены использовать WebRTC для потоковой передачи и WebSockets для сигнализации. Мы планируем создать PeerConnection от каждого клиента к серверу и поместить данные, полученные через каждый поток, в очередь RabbitMQ и передавать данные другим клиентам

Как мы можем этого достичь? Как мы используем модуль мультипроцессорной обработки для создания нового экземпляра сервера, когда новый клиент хочет передавать данные потоком. Я предполагаю, что у нас должен быть только один сигнальный сервер и множество одноранговых соединений, основанных на отсутствии клиентов.

signaling.py

 class WebSocket:  def __init__(self, handler, host='0.0.0.0', port=8765):  self.host = host  self.port = port  self.handler = handler   async def serve(self):  print('starting server')  async with websockets.serve(self.handler, self.host, self.port):  await asyncio.Future()   if __name__ == '__main__':  server = Server() # Server() instance has to dynamically created  socket = WebSocket(handler=server.handler) # A dynamic handler?   try:  asyncio.get_event_loop().run_until_complete(socket.serve())  except KeyboardInterrupt:  pass  finally:  asyncio.get_event_loop().close()  

server.py

 class Server:  def __init__(self, host='0.0.0.0', port=8765):  self.host = host  self.port = port  self.signaling = WebSocketSignaling(host, port)  self.pc = RTCPeerConnection()   async def consume_signaling(self):  while True:  obj = await self.signaling.receive()  if isinstance(obj, RTCSessionDescription):  await self.pc.setRemoteDescription(obj)  if obj.type == "offer":  await self.pc.setLocalDescription(await self.pc.createAnswer())  await self.signaling.send(self.pc.localDescription)  elif isinstance(obj, RTCIceCandidate):  await self.pc.addIceCandidate(obj)  elif obj is BYE:  print("Exiting")  break   async def handler(self, websocket, path):  await self.signaling.connect(websocket)  pc = self.pc   @pc.on("datachannel")  def on_datachannel(channel):  @channel.on("message")  async def on_message(message):  # parse chunks of data and add to queue    await self.consume_signaling()