Запуск и ожидание асинхронной функции из синхронной с использованием Python asyncio

#python-3.x #asynchronous #python-asyncio

#python-3.x #асинхронный #python-asyncio

Вопрос:

В моем коде у меня есть класс со свойствами, которым иногда требуется запускать асинхронный код. Иногда мне нужно получить доступ к свойству из асинхронной функции, иногда из синхронной — вот почему я не хочу, чтобы мои свойства были асинхронными. Кроме того, у меня сложилось впечатление, что асинхронные свойства вообще — это запах кода. Поправьте меня, если я ошибаюсь.

У меня проблема с выполнением асинхронного метода из свойства synchronous и блокировкой дальнейшего выполнения до завершения работы асинхронного метода.

Вот пример кода:

 import asyncio


async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')


def synchronous_property():
    print('entering synchronous_property')
    loop = asyncio.get_event_loop()
    try:
        # this will raise an exception, so I catch it and ignore
        loop.run_until_complete(asynchronous())
    except RuntimeError:
        pass
    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())
  

Ее вывод:

 entering main
entering synchronous_property
exiting synchronous_property
exiting main
entering asynchronous
exiting asynchronous
  

Во-первых, RuntimeError захват кажется неправильным, но если я этого не сделаю, я получу RuntimeError: This event loop is already running исключение.

Во-вторых, asynchronous() функция выполняется последней, после завершения синхронной. Я хочу выполнить некоторую обработку набора данных асинхронным методом, поэтому мне нужно дождаться его завершения. Если я добавлю ее await asyncio.sleep(0) после вызова synchronous_property() , она вызовется asynchronous() перед main() завершением, но мне это не поможет. Мне нужно запустить asynchronous() до synchronous_property() завершения.

Чего мне не хватает? Я использую Python 3.7.

Комментарии:

1. main () и asynchronous () оба являются асинхронными. synchronous_property() является sync, но вызывается в async. Таким образом, группировка вашей печати выглядит правильно. Ошибка, которую вы улавливаете, предупреждает вас о том, что вы пытаетесь создать дополнительный цикл событий, который является критичным для всей проблемы.

2. Да, я в курсе того факта, что вывод правильный. Я хочу выполнить асинхронный вызов из sync и заблокировать его выполнение.

Ответ №1:

Asyncio действительно настаивает на том, чтобы не допускать вложенных циклов, по дизайну. Однако вы всегда можете запустить другой цикл событий в другом потоке. Вот вариант, который использует пул потоков, чтобы избежать необходимости каждый раз создавать новый поток:

 import asyncio, concurrent.futures

async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')

pool = concurrent.futures.ThreadPoolExecutor()

def synchronous_property():
    print('entering synchronous_property')
    result = pool.submit(asyncio.run, asynchronous()).result()
    print('exiting synchronous_property', result)

async def asynchronous():
    print('entering asynchronous')
    await asyncio.sleep(1)
    print('exiting asynchronous')
    return 42

asyncio.run(main())
  

Этот код создает новый цикл событий на каждой границе синхронизация-> асинхронность, поэтому не ожидайте высокой производительности, если вы делаете это часто. Это можно было бы улучшить, создав только один цикл событий для каждого потока с помощью asyncio.new_event_loop и кэшировав его в локальной переменной потока.

Комментарии:

1. Просто вызвать функцию синхронизации, используя исполнитель пула потоков? Один цикл в одном процессе с несколькими потоками для обработки функции синхронизации?

2. @Craftables Операционная система хочет, чтобы их функции синхронизации могли произвольно вызывать асинхронные функции. Поскольку функции синхронизации сами вызываются из async, это является вложенностью, и это не будет работать с одним циклом событий.

3. Хорошо, итак, в принципе, нет другого чистого решения, кроме как использовать асинхронные функции повсюду. Всегда существует опасность того, что n функций глубже, мне нужно будет вызывать async, а единственная неасинхронная функция в цепочке сделает это очень неудобным. Приятно знать, спасибо.

4. @AndrzejKlajnert Точно, предположение о том, что все асинхронно, глубоко внедрено, потому что каждая сопрограмма должна иметь возможность приостанавливать весь путь до цикла событий. Циклы рекурсивных событий не работают, потому что рекурсивный цикл означает, что новые задачи будут выполняться до завершения или приостановки текущей задачи, что означает, что она находится в несогласованном состоянии. Невозможность вызвать async для синхронизации иногда называют проблемой с цветом функции.

5. @unhammer, Потому что это возникло бы RuntimeError при synchronous_property() вызове из асинхронного кода — циклы asyncio не вложены. У OP есть довольно специфическое требование для выполнения асинхронного кода из контекста синхронизации, даже когда этот контекст синхронизации сам вызывается из async. Это не то, для чего был разработан asyncio, поэтому необходим взлом (обходной путь) с использованием потоков.

Ответ №2:

Самый простой способ — использовать существующее «колесо», например asgiref.async_to_sync

 from asgiref.sync import async_to_sync
  

затем:

 async_to_sync(main)()
  

в общем:

 async_to_sync(<your_async_func>)(<.. arguments for async function ..>)
  

Это вызывающий класс, который включает ожидаемый, который работает только в потоке с
цикл событий в синхронный вызываемый объект, который работает в подпотоке.

Если стек вызовов содержит асинхронный цикл, код выполняется там. В противном случае код выполняется в новом цикле в новом потоке.

В любом случае, этот поток затем приостанавливается и ожидает запуска любого кода thread_sensitive, вызываемого из стека вызовов ниже с использованием SyncToAsync, прежде чем окончательно завершиться, как только асинхронная задача вернется.

Ответ №3:

Похоже, проблема с вопросом, как указано. Повторяя вопрос: как взаимодействовать между потоком (не содержащим асинхронных процессов и, следовательно, считающимся синхронизированным) и асинхронным процессом (выполняющимся в некотором цикле событий). Один из подходов заключается в использовании двух очередей синхронизации. Процесс синхронизации помещает свой запрос / параметры в QtoAsync и ожидает QtoSync. Асинхронный процесс считывает QtoAsync БЕЗ ожидания, и если он находит запрос / параметры, выполняет запрос и помещает результат в QtoSync.

 import queue
QtoAsync = queue.Queue()
QtoSync = queue.Queue()
...

async def asyncProc():
    while True:
        try:
            data=QtoAsync.get_nowait()
            result = await <the async that you wish to execute>
            QtoAsync.put(result) #This can block if queue is full. you can use put_nowait and handle the exception.
        except queue.Empty:
            await asyncio.sleep(0.001) #put a nominal delay forcing this to wait in event loop
....
#start the sync process in a different thread here..
asyncio.run(main()) #main invokes the async tasks including the asyncProc

The sync thread puts it request to async using:
req = <the async that you wish to execute>
QtoAsync.put(req)
result = QtoSync.get()
  

Это должно сработать.

Проблема с вопросом, как указано: 1. При запуске асинхронных процессов с помощью блоков выполнения asyncio.run (или аналогичных) до завершения асинхронных процессов. Перед вызовом asyncio необходимо явно запустить отдельный поток синхронизации.выполнить 2. В общем случае процессы asyncio зависят от других процессов asyncio в этом цикле. Таким образом, прямой вызов асинхронного процесса из другого потока не разрешен. Взаимодействие должно осуществляться с циклом событий, и использование двух очередей — это один из подходов.

Ответ №4:

Я хочу выполнить асинхронный вызов из sync и заблокировать его выполнение

Просто сделайте функцию синхронизации асинхронной и ожидайте асинхронную функцию. Асинхронные функции ничем не отличаются от обычных функций, и вы можете поместить в них любой код, который захотите. Если у вас все еще есть проблема, измените свой вопрос, используя фактический код, который вы пытаетесь запустить.

 import asyncio


async def main():
    print('entering main')
    await synchronous_property()
    print('exiting main')


async def synchronous_property():
    print('entering synchronous_property')

    await asynchronous()

    # Do whatever sync stuff you want who cares

    print('exiting synchronous_property')


async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')


asyncio.run(main())
  

Комментарии:

1. В этом суть — я не хочу, чтобы мое свойство было асинхронным. Она также выполняется некоторым синхронным кодом, я не хочу, чтобы весь мой код содержал асинхронные функции.