Запускайте задачи блокировки и разблокировки вместе с asyncio

#python #async-await #python-asyncio

#python #асинхронный режим ожидания #python-asyncio

Вопрос:

Я хочу запускать задачи блокировки и разблокировки вместе асинхронно. Очевидно, что для блокировки задач из asyncio необходимо использовать метод run_in_executor. Вот мой пример кода:

 import asyncio
import concurrent.futures
import datetime
import time


def blocking():
    print("Enter to blocking()", datetime.datetime.now().time())
    time.sleep(2)
    print("Exited from blocking()", datetime.datetime.now().time())


async def waiter():
    print("Enter to waiter()", datetime.datetime.now().time())
    await asyncio.sleep(3)
    print("Exit from waiter()", datetime.datetime.now().time())


async def asynchronous(loop):
    print("Create tasks", datetime.datetime.now().time())
    task_1 = asyncio.create_task(waiter())

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    task_2 = loop.run_in_executor(executor, blocking)

    tasks = [task_1, task_2]
    print("Tasks are created", datetime.datetime.now().time())
    await asyncio.wait(tasks)


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asynchronous(loop))
    except (OSError) as exc:
        sys.exit('Exception: '   str(exc))
  

Должен ли я использовать тот же цикл событий для блокировки задачи в run_in_executor, или необходимо использовать другой? Что я должен изменить в своем коде, чтобы он работал асинхронно? Спасибо

Комментарии:

1. В дополнение к тому, что говорится в ответе, обратите внимание, что вам не нужно создавать новый пул потоков для каждого запуска. Целью пула потоков является повторное использование потоков для повышения эффективности. Вы можете просто передать None в качестве первого аргумента в run_in_executor и заставить его использовать пул потоков, созданный asyncio для цикла событий.

2. о, я понимаю. Но если я не буду использовать отдельные потоки из пользовательского потока, это заблокирует поток циклом событий, не так ли?

3. Я не говорю, что вы должны использовать пул потоков, я просто говорю, что вам не нужно создавать свой собственный пул потоков. При передаче None в run_in_executor будет использоваться собственный пул потоков asyncio, который существует именно для этой цели. Этот пул потоков по-прежнему имеет свои собственные потоки и не будет блокировать поток пула событий.

4. Теперь я понимаю. Меня просто смутила эта фраза из официальной документации: «Метод loop.run_in_executor() может использоваться с concurrent.futures. ThreadPoolExecutor для выполнения блокирующего кода в другом потоке ОС, не блокируя поток ОС, в котором выполняется цикл событий.» Я думал, что по умолчанию существует только один поток с циклом событий, и, используя свой собственный пул потоков, вы избежите блокировки основного потока с циклом событий

5. Вы правильно подумали, действительно есть только один поток, который запускает цикл событий. Этот пул потоков является вспомогательным средством, предусмотренным для таких ситуаций, как ваша, чтобы каждый не создавал свой собственный пул потоков и количество потоков не росло безгранично. Таким образом, это правда, что вы должны использовать пул потоков, но он не обязательно должен быть вашим собственным — пул потоков , предоставляемый asyncio (который отличается от потока, который запускает цикл событий), отлично подходит.

Ответ №1:

Вы должны использовать тот же цикл. Цикл делегирует исполнителю, который выполняет задачи, отдельные потоки для цикла событий. Таким образом, вам не нужно беспокоиться о том, что ваши блокирующие задачи блокируют цикл событий. Если вы используете отдельный цикл, ваши асинхронные функции из цикла событий не смогут ожидать результатов блокировки функций, выполняемых в новом цикле.

Цикл событий управляет этим, создавая будущее для представления задачи исполнителя. Затем он запускает задачу блокировки в одном из потоков исполнителей, и когда задача-исполнитель возвращает результат future, устанавливается и управление возвращается ожидающей функции в цикле событий (если таковой имеется).

Комментарии:

1. Большое спасибо. Итак, приведенный выше код кажется правильным? Там был создан один цикл событий, и он использовался для блокирующих и неблокирующих задач.

2. Это хорошо, за исключением того, что вам следует избегать прохождения цикла событий в асинхронных функциях — это может привести к ошибкам. Вместо этого используйте asyncio.get_running_loop() , если асинхронной функции требуется доступ к циклу событий. Это новое в версии 3.7, так что используйте get_event_loop() , если вы используете версию 3.6 или ниже.