#python-3.x #asynchronous #python-asyncio #pywin32 #python-sockets
#python-3.x #асинхронный #python-asyncio #pywin32 #python-сокеты
Вопрос:
Я работаю над проектом, в котором у меня есть простой сервер сокетов, ищущий соединения, чтобы ответить клиенту сообщением в очереди сообщений. В то же время у меня есть система мониторинга, использующая PyWin32, которая ищет изменения в файлах в каталоге. Когда PyWin32 обнаруживает изменение файла, он должен добавить информацию в очередь сообщений. Когда сервер обнаруживает соединение, он должен извлечь соответствующую информацию из очереди сообщений. Сервер будет постоянно искать соединения, в то время как PyWin32 запускается только при обнаружении изменений.
Я могу успешно запускать их по отдельности, но не вместе. Похоже, что мой код зависает на одной функции, а другая прекращает обработку. Я прочитал документацию и просмотрел сообщения SO, но, похоже, ничто не решает мою проблему.
Вот пример сервера сокетов:
class TestSocketServer: def __init__(self, # response_handler: Callable ): self._alive: bool = False # Configure client-server connection self._host: str = 'localhost' self._port: int = 50000 self._mq = {} def _respond(self, reader): request = self._mq[reader].get() return f"Got request {request}" async def _client_connected(self, reader, writer): data = await reader.read(1024) # print(reader, type(reader)) message = data.decode() print(message) self._mq[reader]: queue.Queue = queue.Queue() self._mq[reader].put_nowait(message) response = self._respond(reader) writer.write(response.encode()) await writer.drain() writer.close() async def handle_traffic(self): server = await asyncio.start_server( self._client_connected, self._host, self._port) await server.serve_forever() def start(self): asyncio.run(self.handle_traffic())
Обработчик PyWin32 выглядит следующим образом:
async def _monitor(self): """ Look for changes in directory and return them :return: FILE_NOTIFY_INFORMATION """ ReadDirectoryChangesW(self._handle, # Python version of Windows Handle self._buffer, TRUE, # Monitor directory tree # Attributes to monitor (FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE) ) change = FILE_NOTIFY_INFORMATION(self._buffer, 9999) action, item = change[0] act_str = EventWatcher._ACTIONS.get(action) if change: chg_msg = {str(time.time()): [str(act_str), item]} # TODO try to push changes through socket before persisting await self._write_changes(action=action, change=chg_msg) async.run(self._monitor)
Как я пытался их реализовать (что исключает вызовы async.run ()):
loop = asyncio.new_event_loop() loop.create_task(self.handle_traffic()) loop.create_task(self.monitor()) loop.run_forever()
Я также попытался создать будущее в своем цикле для части мониторинга:
loop = asyncio.new_event_loop() loop.create_task(self.handle_traffic()) fut = loop.create_future() loop.create_task(self.monitor(fut)) loop.run_forever()
Комментарии:
1. Было бы полезно посмотреть, как вы их реализовали вместе. Из того, что вы показываете, я предполагаю, что вы неправильно используете » asyncio.run ()». Предполагается, что это ваша точка входа для всей вашей асинхронной программы, поэтому не используйте ее в методе.
2. Попался. Я устал от нескольких реализаций, но безуспешно, но позвольте мне включить то, что я пробовал.
3. @thisisalsomypassword добавлено. При использовании циклов событий я сталкиваюсь с той же проблемой, когда одна сопрограмма, похоже, блокирует другую.
4. Понятно, но это невозможно без потоков. Если вы абсолютно не хотите этого, вы также можете просто создать своего рода микросервис и передавать события fs через сокет.
5. Это определенно сработало бы, хотя в основном это то, что я уже делаю, и я бы хотел избежать добавления еще одного слоя, если это возможно. В этом случае резьба должна работать нормально.
Ответ №1:
Спасибо @thisisalsomypassword за указание на то, что мои вызовы Win32 API блокировали остальную часть моего скрипта и что мне нужно было бы использовать потоки. Выполнение этой нижеприведенной реализации устранило мою проблему:
class TestSocketServer: def __init__(self, # response_handler: Callable ): self._alive: bool = False # Configure client-server connection self._host: str = 'localhost' self._port: int = 50000 self._mq = {} def _respond(self, reader): request = self._mq[reader].get() return f"Got request {request}" async def _client_connected(self, reader, writer): data = await reader.read(1024) # print(reader, type(reader)) message = data.decode() print(message) self._mq[reader]: queue.Queue = queue.Queue() self._mq[reader].put_nowait(message) response = self._respond(reader) writer.write(response.encode()) await writer.drain() writer.close() async def start_serving(self): await asyncio.start_server( self._client_connected, self._host, self._port)
Для наблюдателя:
def monitor(self): """ Look for changes in directory and return them :return: FILE_NOTIFY_INFORMATION """ while True: ReadDirectoryChangesW(self._handle, # Python version of Windows Handle self._buffer, TRUE, # Monitor directory tree # Attributes to monitor (FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE) ) change = FILE_NOTIFY_INFORMATION(self._buffer, 9999) action, item = change[0] print(change)
Затем реализация этих двух способов работает следующим образом:
self._server = TestSocketServer() self._watcher = EventWatcher() def main(self): loop = asyncio.get_event_loop() loop.create_task(asyncio.to_thread(self._watcher.monitor)) loop.create_task(self._server.start_serving()) loop.run_forever() main()
Теперь, всякий раз, когда есть запрос сокета или изменение в файловой системе, я вижу, как появляются оба, и программа продолжает, как и ожидалось, обрабатывать будущие входные данные.
Комментарии:
1.
asyncio.to_thread()
это было ново для меня. Так что спасибо за это!2. С удовольствием! Я работаю в основном специалистом по обработке данных, так что все это было далеко за пределами моей рулевой рубки XD еще раз спасибо за помощь!