#python-3.x #python-asyncio
#python-3.x #python-asyncio
Вопрос:
В приведенном здесь примере кода все задачи asyncio запускаются первыми. После этого задачи возобновляются, если операция ввода-вывода завершена.
Вывод выглядит следующим образом, где вы можете увидеть 6 сообщений о результатах после первых 6 сообщений о запуске.
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
Но чего я ожидал бы и что могло бы ускорить работу в моих случаях, это что-то вроде этого
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
1938204 size for https://www.b-i-t-online.de/bitrss.xml
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
38697 size for https://jamanetwork.com/rss/site_3/67.xml
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
В моем реальном коде у меня есть сотни загрузочных задач, подобных этой. Обычно некоторые загрузки завершаются до того, как все они начаты.
Есть ли способ справиться с этим с помощью asyncio
?
Вот минимальный рабочий пример:
#!/usr/bin/env python3
import random
import urllib.request
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()
urls = ['https://www.b-i-t-online.de/bitrss.xml',
'https://jamanetwork.com/rss/site_3/67.xml',
'http://twitrss.me/twitter_user_to_rss/?user=cochranecollab']
async def parse_one_url(u):
print('-- Starting {}...'.format(u))
r = await loop.run_in_executor(executor,
urllib.request.urlopen, u)
r = '{} size for {}'.format(len(r.read()), u)
print(r)
async def do_async_parsing():
tasks = [
parse_one_url(u)
for u in urls
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('FINISHED with {} results from {} tasks.'
.format(len(results), len(tasks)))
if __name__ == '__main__':
# blow up the urls
urls = urls * 2
random.shuffle(urls)
try:
#loop.set_debug(True)
loop.run_until_complete(do_async_parsing())
finally:
loop.close()
Побочный вопрос: разве asyncio
это не бесполезно в моем случае? Не проще ли использовать только несколько потоков?
Ответ №1:
В моем реальном коде у меня есть сотни загрузочных задач, подобных этой. Обычно некоторые загрузки завершаются до того, как все они начаты.
Что ж, вы создали все загрузки заранее и поручили asyncio запустить их все с помощью asyncio.wait
. Простое начало выполнения сопрограммы практически бесплатно, поэтому нет причин каким-либо образом ограничивать эту часть. Однако задачи, фактически отправленные в пул, ThreadPoolExecutor
ограничены количеством рабочих элементов в пуле, по умолчанию в 5 раз превышающим количество процессоров, но настраиваемым. Если количество URL-адресов превышает количество рабочих, вы должны получить желаемое поведение. (Но чтобы действительно наблюдать за этим, вам нужно переместить распечатки журналов в функцию, управляемую исполнителем.)
Обратите внимание, что синхронный вызов r.read()
также должен находиться внутри функции, запускаемой исполнителем, иначе он заблокирует весь цикл событий. Исправленная часть кода будет выглядеть следующим образом:
def urlopen(u):
print('-- Starting {}...'.format(u))
r = urllib.request.urlopen(u) # blocking call
content = r.read() # another blocking call
print('{} size for {}'.format(len(content), u))
async def parse_one_url(u):
await loop.run_in_executor(executor, urlopen, u)
Однако вышесказанное не является идиоматическим использованием asyncio. Обычно идея заключается в том, что вы вообще не используете потоки, а вызываете изначально асинхронный код, например, используя aiohttp. Тогда вы получаете преимущества asyncio, такие как отмена работы и масштабируемость для большого количества задач. В этой настройке вы бы ограничили количество одновременных задач, тривиально обернув извлечение в asyncio.Semaphore
.
Если вся ваша фактическая логика состоит из синхронных вызовов, вам вообще не нужен asyncio; вы можете напрямую отправлять фьючерсы исполнителю и использовать concurrent.futures
функции синхронизации, такие как wait()
и as_completed
, чтобы дождаться их завершения.
Комментарии:
1. Я увеличиваю URL-адреса до
245 * 1150 = 281750
. Все они запускаются до того, как какие-либо результаты будут напечатаны вstdout
. В моем примере из реального мира этоaiohttp
не выбор, потому что пакет сторонних разработчиков, который я использую, использует блокировку ввода-вывода.2. Хорошо, я понял. Запуск не означает, что
feedparser.parse()
вызывается. Но именно в этом моя проблема. Ее следует вызывать перед запуском всех задач.3. @buhtz Хорошо, проблема заключается в методологии. Ваша запись в журнал находится не в методе, который фактически отправлен в пул потоков, а в сопрограмме. Если вы переместите их , вы получите ожидаемый результат. Я соответствующим образом изменил ответ.