#python-3.x #websocket #multiprocessing #webrtc #aiortc
Вопрос:
Пытаясь создать много-много потоковой архитектуры, мы намерены использовать WebRTC для потоковой передачи и WebSockets для сигнализации. Мы планируем создать PeerConnection от каждого клиента к серверу и поместить данные, полученные через каждый поток, в очередь RabbitMQ и передавать данные другим клиентам
Как мы можем этого достичь? Как мы используем модуль мультипроцессорной обработки для создания нового экземпляра сервера, когда новый клиент хочет передавать данные потоком. Я предполагаю, что у нас должен быть только один сигнальный сервер и множество одноранговых соединений, основанных на отсутствии клиентов.
signaling.py
class WebSocket: def __init__(self, handler, host='0.0.0.0', port=8765): self.host = host self.port = port self.handler = handler async def serve(self): print('starting server') async with websockets.serve(self.handler, self.host, self.port): await asyncio.Future() if __name__ == '__main__': server = Server() # Server() instance has to dynamically created socket = WebSocket(handler=server.handler) # A dynamic handler? try: asyncio.get_event_loop().run_until_complete(socket.serve()) except KeyboardInterrupt: pass finally: asyncio.get_event_loop().close()
server.py
class Server: def __init__(self, host='0.0.0.0', port=8765): self.host = host self.port = port self.signaling = WebSocketSignaling(host, port) self.pc = RTCPeerConnection() async def consume_signaling(self): while True: obj = await self.signaling.receive() if isinstance(obj, RTCSessionDescription): await self.pc.setRemoteDescription(obj) if obj.type == "offer": await self.pc.setLocalDescription(await self.pc.createAnswer()) await self.signaling.send(self.pc.localDescription) elif isinstance(obj, RTCIceCandidate): await self.pc.addIceCandidate(obj) elif obj is BYE: print("Exiting") break async def handler(self, websocket, path): await self.signaling.connect(websocket) pc = self.pc @pc.on("datachannel") def on_datachannel(channel): @channel.on("message") async def on_message(message): # parse chunks of data and add to queue await self.consume_signaling()