#python #multithreading #websocket #server
#python #многопоточность #websocket #сервер
Вопрос:
У меня есть сервер, который собирает данные с нескольких GPS-трекеров, и я хочу отправлять эти данные в режиме реального времени X подключенным клиентам через WebSockets. Трекеры подключаются по протоколу TCP (каждый в своем потоке) и регулярно отправляют данные на сервер. Данные объединяются в вызываемый поток data_merger
и помещаются в эти потоки queue()
. Этот механизм работает хорошо и по назначению, однако я сталкиваюсь с проблемами, когда хочу отправить эти данные в соединения websocket.
Я попытался основать свое решение на примере синхронизации websocket, так как это, похоже, применимо к моему usecase. У меня есть вызываемый поток outbound_worker
, который обрабатывает код websocket. От thread.run()
:
def run(self):
self.data_merger.name = 'data_merger'
self.data_merger.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start_server = websockets.serve(self.handle_clients, 'localhost', self.port)
print("WebSocker server started for port %s at %s" % (self.port, datetime.now()))
loop.run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Затем метод обработчика для сервера websocket:
async def handle_clients(self, websocket, path):
while True:
try:
# Register the websocket to connected set
await self.register(websocket)
data = self.data_merger.queue.get()
await send_to_clients(data)
await asyncio.sleep(0.1)
except websockets.ConnectionClosed:
print("Connection closed")
await self.unregister(websocket)
break
async def send_to_clients(self, data):
data = json.dumps(data)
if self.connected:
await asyncio.wait([ws.send(data) for ws in self.connected])
Методы register()
и unregister()
идентичны примеру, который я привел выше. Клиент, который я использую, представляет собой базовый цикл, который печатает полученные данные:
async def hello():
uri = "ws://localhost:64000"
while True:
async with websockets.connect(uri) as websocket:
print("Awaiting data...")
data = await websocket.recv()
#print(f"{data}")
print(f"{json.loads(data)}")
asyncio.get_event_loop().run_until_complete(hello())
Поскольку я новичок в асинхронных вызовах в Python и websockets в целом, я не уверен, что мой подход здесь правильный. Поскольку я пытаюсь отправить данные сразу после регистрации нового соединения, код, похоже, останавливается на await send_to_clients(data)
строке. Должен ли я скорее обработать это в data_merger
потоке и передать connected
set?
Другая проблема заключается в том, что если я просто использую client_handler
to register()
и unregister()
новые подключения, кажется, что это просто перебирает register()
часть, и я не могу подключить второго клиента.
Я думаю, мои вопросы можно свести к следующему:
- Как мне принимать и управлять несколькими открытыми соединениями через websocket, аналогично многопоточному серверу сокетов?
- Есть ли способ вызвать вызов функции (например,
register()
только для новых подключений к websocket, аналогичноsocket.listen()
иsocket.accept()
?
Комментарии:
1. Вероятно, вам следует изучить «брокеры сообщений» и шаблоны pub / sub (публикация / подписка). Вы не только дублируете большую часть существующей работы, но и выполняете ее неоптимальным образом, поэтому по мере роста вашей инфраструктуры вы столкнетесь с проблемами масштабирования.
2. Спасибо, мне просто нужен указатель в правильном направлении. Я проверю это