Возобновить выполнение асинхронной задачи перед запуском всех задач

#python-3.x #python-asyncio

#python-3.x #python-asyncio

Вопрос:

В приведенном здесь примере кода все задачи asyncio запускаются первыми. После этого задачи возобновляются, если операция ввода-вывода завершена.

Вывод выглядит следующим образом, где вы можете увидеть 6 сообщений о результатах после первых 6 сообщений о запуске.

 -- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
  

Но чего я ожидал бы и что могло бы ускорить работу в моих случаях, это что-то вроде этого

 -- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
1938204 size for https://www.b-i-t-online.de/bitrss.xml
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
38697 size for https://jamanetwork.com/rss/site_3/67.xml
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
  

В моем реальном коде у меня есть сотни загрузочных задач, подобных этой. Обычно некоторые загрузки завершаются до того, как все они начаты.

Есть ли способ справиться с этим с помощью asyncio ?

Вот минимальный рабочий пример:

 #!/usr/bin/env python3
import random
import urllib.request
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()
urls = ['https://www.b-i-t-online.de/bitrss.xml',
        'https://jamanetwork.com/rss/site_3/67.xml',
        'http://twitrss.me/twitter_user_to_rss/?user=cochranecollab']

async def parse_one_url(u):
    print('-- Starting {}...'.format(u))
    r = await loop.run_in_executor(executor,
                                   urllib.request.urlopen, u)
    r = '{} size for {}'.format(len(r.read()), u)
    print(r)

async def do_async_parsing():
    tasks = [
        parse_one_url(u)
        for u in urls
            ]

    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]

    print('FINISHED with {} results from {} tasks.'
          .format(len(results), len(tasks)))

if __name__ == '__main__':
    # blow up the urls
    urls = urls * 2
    random.shuffle(urls)
    try:
        #loop.set_debug(True)
        loop.run_until_complete(do_async_parsing())
    finally:
        loop.close()
  

Побочный вопрос: разве asyncio это не бесполезно в моем случае? Не проще ли использовать только несколько потоков?

Ответ №1:

В моем реальном коде у меня есть сотни загрузочных задач, подобных этой. Обычно некоторые загрузки завершаются до того, как все они начаты.

Что ж, вы создали все загрузки заранее и поручили asyncio запустить их все с помощью asyncio.wait . Простое начало выполнения сопрограммы практически бесплатно, поэтому нет причин каким-либо образом ограничивать эту часть. Однако задачи, фактически отправленные в пул, ThreadPoolExecutor ограничены количеством рабочих элементов в пуле, по умолчанию в 5 раз превышающим количество процессоров, но настраиваемым. Если количество URL-адресов превышает количество рабочих, вы должны получить желаемое поведение. (Но чтобы действительно наблюдать за этим, вам нужно переместить распечатки журналов в функцию, управляемую исполнителем.)

Обратите внимание, что синхронный вызов r.read() также должен находиться внутри функции, запускаемой исполнителем, иначе он заблокирует весь цикл событий. Исправленная часть кода будет выглядеть следующим образом:

 def urlopen(u):
    print('-- Starting {}...'.format(u))
    r = urllib.request.urlopen(u)  # blocking call
    content = r.read()             # another blocking call
    print('{} size for {}'.format(len(content), u))

async def parse_one_url(u):
    await loop.run_in_executor(executor, urlopen, u)
  

Однако вышесказанное не является идиоматическим использованием asyncio. Обычно идея заключается в том, что вы вообще не используете потоки, а вызываете изначально асинхронный код, например, используя aiohttp. Тогда вы получаете преимущества asyncio, такие как отмена работы и масштабируемость для большого количества задач. В этой настройке вы бы ограничили количество одновременных задач, тривиально обернув извлечение в asyncio.Semaphore .

Если вся ваша фактическая логика состоит из синхронных вызовов, вам вообще не нужен asyncio; вы можете напрямую отправлять фьючерсы исполнителю и использовать concurrent.futures функции синхронизации, такие как wait() и as_completed , чтобы дождаться их завершения.

Комментарии:

1. Я увеличиваю URL-адреса до 245 * 1150 = 281750 . Все они запускаются до того, как какие-либо результаты будут напечатаны в stdout . В моем примере из реального мира это aiohttp не выбор, потому что пакет сторонних разработчиков, который я использую, использует блокировку ввода-вывода.

2. Хорошо, я понял. Запуск не означает, что feedparser.parse() вызывается. Но именно в этом моя проблема. Ее следует вызывать перед запуском всех задач.

3. @buhtz Хорошо, проблема заключается в методологии. Ваша запись в журнал находится не в методе, который фактически отправлен в пул потоков, а в сопрограмме. Если вы переместите их , вы получите ожидаемый результат. Я соответствующим образом изменил ответ.