Запуск цикла асинхронного сервера Python и в будущем одновременно

#python-3.x #asynchronous #python-asyncio #pywin32 #python-sockets

#python-3.x #асинхронный #python-asyncio #pywin32 #python-сокеты

Вопрос:

Я работаю над проектом, в котором у меня есть простой сервер сокетов, ищущий соединения, чтобы ответить клиенту сообщением в очереди сообщений. В то же время у меня есть система мониторинга, использующая PyWin32, которая ищет изменения в файлах в каталоге. Когда PyWin32 обнаруживает изменение файла, он должен добавить информацию в очередь сообщений. Когда сервер обнаруживает соединение, он должен извлечь соответствующую информацию из очереди сообщений. Сервер будет постоянно искать соединения, в то время как PyWin32 запускается только при обнаружении изменений.

Я могу успешно запускать их по отдельности, но не вместе. Похоже, что мой код зависает на одной функции, а другая прекращает обработку. Я прочитал документацию и просмотрел сообщения SO, но, похоже, ничто не решает мою проблему.

Вот пример сервера сокетов:

 class TestSocketServer:  def __init__(self,  # response_handler: Callable  ):  self._alive: bool = False   # Configure client-server connection  self._host: str = 'localhost'  self._port: int = 50000   self._mq = {}   def _respond(self, reader):  request = self._mq[reader].get()   return f"Got request {request}"   async def _client_connected(self, reader, writer):  data = await reader.read(1024)   # print(reader, type(reader))  message = data.decode()  print(message)   self._mq[reader]: queue.Queue = queue.Queue()  self._mq[reader].put_nowait(message)   response = self._respond(reader)   writer.write(response.encode())  await writer.drain()   writer.close()   async def handle_traffic(self):  server = await asyncio.start_server(  self._client_connected,  self._host,  self._port)   await server.serve_forever()   def start(self):  asyncio.run(self.handle_traffic())  

Обработчик PyWin32 выглядит следующим образом:

 async def _monitor(self):  """  Look for changes in directory and return them  :return: FILE_NOTIFY_INFORMATION  """  ReadDirectoryChangesW(self._handle, # Python version of Windows Handle  self._buffer,  TRUE, # Monitor directory tree  # Attributes to monitor  (FILE_NOTIFY_CHANGE_SIZE |  FILE_NOTIFY_CHANGE_ATTRIBUTES |  FILE_NOTIFY_CHANGE_DIR_NAME |  FILE_NOTIFY_CHANGE_FILE_NAME |  FILE_NOTIFY_CHANGE_LAST_WRITE)  )   change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)   action, item = change[0]  act_str = EventWatcher._ACTIONS.get(action)   if change:  chg_msg = {str(time.time()): [str(act_str), item]}  # TODO try to push changes through socket before persisting  await self._write_changes(action=action, change=chg_msg)  async.run(self._monitor)  

Как я пытался их реализовать (что исключает вызовы async.run ()):

 loop = asyncio.new_event_loop() loop.create_task(self.handle_traffic()) loop.create_task(self.monitor()) loop.run_forever()  

Я также попытался создать будущее в своем цикле для части мониторинга:

 loop = asyncio.new_event_loop() loop.create_task(self.handle_traffic()) fut = loop.create_future() loop.create_task(self.monitor(fut)) loop.run_forever()  

Комментарии:

1. Было бы полезно посмотреть, как вы их реализовали вместе. Из того, что вы показываете, я предполагаю, что вы неправильно используете » asyncio.run ()». Предполагается, что это ваша точка входа для всей вашей асинхронной программы, поэтому не используйте ее в методе.

2. Попался. Я устал от нескольких реализаций, но безуспешно, но позвольте мне включить то, что я пробовал.

3. @thisisalsomypassword добавлено. При использовании циклов событий я сталкиваюсь с той же проблемой, когда одна сопрограмма, похоже, блокирует другую.

4. Понятно, но это невозможно без потоков. Если вы абсолютно не хотите этого, вы также можете просто создать своего рода микросервис и передавать события fs через сокет.

5. Это определенно сработало бы, хотя в основном это то, что я уже делаю, и я бы хотел избежать добавления еще одного слоя, если это возможно. В этом случае резьба должна работать нормально.

Ответ №1:

Спасибо @thisisalsomypassword за указание на то, что мои вызовы Win32 API блокировали остальную часть моего скрипта и что мне нужно было бы использовать потоки. Выполнение этой нижеприведенной реализации устранило мою проблему:

 class TestSocketServer:  def __init__(self,  # response_handler: Callable  ):  self._alive: bool = False   # Configure client-server connection  self._host: str = 'localhost'  self._port: int = 50000   self._mq = {}   def _respond(self, reader):  request = self._mq[reader].get()   return f"Got request {request}"   async def _client_connected(self, reader, writer):  data = await reader.read(1024)   # print(reader, type(reader))  message = data.decode()  print(message)   self._mq[reader]: queue.Queue = queue.Queue()  self._mq[reader].put_nowait(message)   response = self._respond(reader)   writer.write(response.encode())  await writer.drain()   writer.close()   async def start_serving(self):  await asyncio.start_server(  self._client_connected,  self._host,  self._port)  

Для наблюдателя:

 def monitor(self):  """  Look for changes in directory and return them  :return: FILE_NOTIFY_INFORMATION  """  while True:  ReadDirectoryChangesW(self._handle, # Python version of Windows Handle  self._buffer,  TRUE, # Monitor directory tree  # Attributes to monitor  (FILE_NOTIFY_CHANGE_SIZE |  FILE_NOTIFY_CHANGE_ATTRIBUTES |  FILE_NOTIFY_CHANGE_DIR_NAME |  FILE_NOTIFY_CHANGE_FILE_NAME |  FILE_NOTIFY_CHANGE_LAST_WRITE)  )   change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)   action, item = change[0]  print(change)  

Затем реализация этих двух способов работает следующим образом:

 self._server = TestSocketServer() self._watcher = EventWatcher()  def main(self):  loop = asyncio.get_event_loop()  loop.create_task(asyncio.to_thread(self._watcher.monitor))  loop.create_task(self._server.start_serving())  loop.run_forever()  main()  

Теперь, всякий раз, когда есть запрос сокета или изменение в файловой системе, я вижу, как появляются оба, и программа продолжает, как и ожидалось, обрабатывать будущие входные данные.

Комментарии:

1. asyncio.to_thread() это было ново для меня. Так что спасибо за это!

2. С удовольствием! Я работаю в основном специалистом по обработке данных, так что все это было далеко за пределами моей рулевой рубки XD еще раз спасибо за помощь!