Python 3.5 асинхронный / ожидающий с реальным примером кода

#python #asynchronous #async-await #python-3.5 #python-asyncio

#python #асинхронный #async-await #python-3.5 #python-asyncio

Вопрос:

Я прочитал множество статей и руководств о том, что такое Python 3.5 async / await. Я должен сказать, что я в замешательстве, потому что некоторые используют get_event_loop() и run_until_complete(), некоторые используют ensure_future(), некоторые используют asyncio.wait(), а некоторые используют call_soon() .

Похоже, у меня много вариантов, но я понятия не имею, полностью ли они идентичны или есть случаи, когда вы используете циклы, а есть случаи, когда вы используете wait() .

Но дело в том, что все примеры работают с asyncio.sleep() имитацией реальной медленной операции, которая возвращает ожидаемый объект. Как только я пытаюсь поменять эту строку на какой-то реальный код, все это терпит неудачу. В чем, черт возьми, различия между подходами, написанными выше, и как мне запустить стороннюю библиотеку, которая не готова к async / await. Я использую службу Quandl для извлечения некоторых исходных данных.

  import asyncio
 import quandl

 async def slow_operation(n):
     # await asyncio.sleep(1) # Works because it's await ready.
     await quandl.Dataset(n) # Doesn't work because it's not await ready.


 async def main():
     await asyncio.wait([
         slow_operation("SIX/US9884981013EUR4"),
         slow_operation("SIX/US88160R1014EUR4"),
     ])

 # You don't have to use any code for 50 requests/day.
 quandl.ApiConfig.api_key = "MY_SECRET_CODE"

 loop = asyncio.get_event_loop()
 loop.run_until_complete(main())
  

Я надеюсь, вы понимаете, насколько я чувствую себя потерянным и насколько простой вещью, которую я хотел бы запустить параллельно.

Ответ №1:

Если сторонняя библиотека несовместима с async/await , то, очевидно, вы не сможете ее легко использовать. Есть два случая:

  1. Предположим, что функция в библиотеке является асинхронной и выдает обратный вызов, например

     def fn(..., clb):
        ...
      

    Итак, вы можете сделать:

     def on_result(...):
        ...
    
    fn(..., on_result)
      

    В этом случае вы можете обернуть такие функции в протокол asyncio следующим образом:

     from asyncio import Future
    
    def wrapper(...):
        future = Future()
        def my_clb(...):
            future.set_result(xyz)
        fn(..., my_clb)
        return future
      

    (использовать future.set_exception(exc) при исключении)

    Затем вы можете просто вызвать эту оболочку в некоторой async функции с await :

     value = await wrapper(...)
      

    Обратите внимание, что это await работает с любым Future объектом. Вам не нужно объявлять wrapper as async .

  2. Если функция в библиотеке синхронна, вы можете запустить ее в отдельном потоке (вероятно, для этого вы бы использовали какой-нибудь пул потоков). Весь код может выглядеть следующим образом:

     import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    # Initialize 10 threads
    THREAD_POOL = ThreadPoolExecutor(10)
    
    def synchronous_handler(param1, ...):
        # Do something synchronous
        time.sleep(2)
        return "foo"
    
    # Somewhere else
    async def main():
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
            loop.run_in_executor(THREAD_POOL, synchronous_handler, param1, ...),
        ]
        await asyncio.wait(futures)
        for future in futures:
            print(future.result())
    
    with THREAD_POOL:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
      

Если вы по какой-либо причине не можете использовать потоки, то использование такой библиотеки просто делает весь асинхронный код бессмысленным.

Однако обратите внимание, что использование синхронной библиотеки с async, вероятно, является плохой идеей. Вы не получите многого, и все же вы сильно усложняете код.

Комментарии:

1. @grafa Мой плохой, я не тестировал код. Сейчас я это исправил и значительно упростил. На самом деле в Python есть эта аккуратная .run_in_executor функция, которая заботится о возврате значений и обработке исключений. Нет необходимости делать это вручную.

2. @grafa Обратите внимание, что если вы запланируете больше задач, чем размер пула потоков, они будут поставлены в очередь и будут ждать потока.

3. @grafa Также помните, что после этого вы должны завершить работу исполнителя. Подробнее об этом читайте здесь: docs.python.org/3/library/concurrent.futures.html

4. @grafa Сначала вы планируете вызовы, а затем можете использовать asyncio.wait() их для ожидания всех. В конце вы можете перебирать фьючерсы и получать результаты путем вызова future.result() . Смотрите обновленный код. Обратите внимание, что .run_in_executor уже возвращает объект future, его не нужно переносить.

5. Большое вам спасибо. Каждая мысль работает, и я думаю, я знаю все это. Вы первый на этой планете, кто предоставил сложный пример того, как использовать Python 3.5 async / await с классическими неасинхронными задачами. Еще раз спасибо!

Ответ №2:

Вы можете взглянуть на следующий простой рабочий пример отсюда. Кстати, он возвращает строку, которую стоит прочитать 🙂

 import aiohttp
import asyncio

async def fetch(client):
  async with client.get('https://docs.aiohttp.org/en/stable/client_reference.html') as resp:
    assert resp.status == 200
    return await resp.text()

async def main():
  async with aiohttp.ClientSession() as client:
    html = await fetch(client)
    print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())