Отправка данных в несколько подключений websocket в Python

#python #multithreading #websocket #server

#python #многопоточность #websocket #сервер

Вопрос:

У меня есть сервер, который собирает данные с нескольких GPS-трекеров, и я хочу отправлять эти данные в режиме реального времени X подключенным клиентам через WebSockets. Трекеры подключаются по протоколу TCP (каждый в своем потоке) и регулярно отправляют данные на сервер. Данные объединяются в вызываемый поток data_merger и помещаются в эти потоки queue() . Этот механизм работает хорошо и по назначению, однако я сталкиваюсь с проблемами, когда хочу отправить эти данные в соединения websocket.

Я попытался основать свое решение на примере синхронизации websocket, так как это, похоже, применимо к моему usecase. У меня есть вызываемый поток outbound_worker , который обрабатывает код websocket. От thread.run() :

 def run(self):
    self.data_merger.name = 'data_merger'
    self.data_merger.start()

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    start_server = websockets.serve(self.handle_clients, 'localhost', self.port)
    print("WebSocker server started for port %s at %s" % (self.port, datetime.now()))
    loop.run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
 

Затем метод обработчика для сервера websocket:

 async def handle_clients(self, websocket, path):
    while True:
        try:
            # Register the websocket to connected set
            await self.register(websocket)
            data = self.data_merger.queue.get()
            await send_to_clients(data)
            await asyncio.sleep(0.1)
        except websockets.ConnectionClosed:
            print("Connection closed")
            await self.unregister(websocket)
            break

async def send_to_clients(self, data):
    data = json.dumps(data)
    if self.connected:
        await asyncio.wait([ws.send(data) for ws in self.connected])
 

Методы register() и unregister() идентичны примеру, который я привел выше. Клиент, который я использую, представляет собой базовый цикл, который печатает полученные данные:

 async def hello():
    uri = "ws://localhost:64000"
    while True:
        async with websockets.connect(uri) as websocket:
            print("Awaiting data...")
            data = await websocket.recv()
            
            #print(f"{data}")
            print(f"{json.loads(data)}")


asyncio.get_event_loop().run_until_complete(hello())
 

Поскольку я новичок в асинхронных вызовах в Python и websockets в целом, я не уверен, что мой подход здесь правильный. Поскольку я пытаюсь отправить данные сразу после регистрации нового соединения, код, похоже, останавливается на await send_to_clients(data) строке. Должен ли я скорее обработать это в data_merger потоке и передать connected set?

Другая проблема заключается в том, что если я просто использую client_handler to register() и unregister() новые подключения, кажется, что это просто перебирает register() часть, и я не могу подключить второго клиента.

Я думаю, мои вопросы можно свести к следующему:

  • Как мне принимать и управлять несколькими открытыми соединениями через websocket, аналогично многопоточному серверу сокетов?
  • Есть ли способ вызвать вызов функции (например, register() только для новых подключений к websocket, аналогично socket.listen() и socket.accept() ?

Комментарии:

1. Вероятно, вам следует изучить «брокеры сообщений» и шаблоны pub / sub (публикация / подписка). Вы не только дублируете большую часть существующей работы, но и выполняете ее неоптимальным образом, поэтому по мере роста вашей инфраструктуры вы столкнетесь с проблемами масштабирования.

2. Спасибо, мне просто нужен указатель в правильном направлении. Я проверю это