#python #python-asyncio #httpx
#python #python-asyncio #httpx
Вопрос:
Я хотел бы читать из нескольких одновременных запросов потоковой передачи HTTP внутри сопрограмм, используя httpx, и возвращать данные обратно в мою неасинхронную функцию, выполняющую цикл событий, а не просто возвращать окончательные данные.
Но если я заставлю свои асинхронные функции выдавать вместо возврата, я получаю жалобы, которые asyncio.as_completed()
и loop.run_until_complete()
ожидают сопрограмму или будущее, а не асинхронный генератор.
Итак, единственный способ заставить это работать вообще — это собирать все потоковые данные внутри каждой сопрограммы, возвращая все данные после завершения запроса. Затем соберите все результаты сопрограммы и, наконец, верните их неасинхронной вызывающей функции.
Это означает, что я должен хранить все в памяти и ждать, пока не завершится самый медленный запрос, прежде чем я получу все свои данные, что сводит на нет весь смысл потоковых http-запросов.
Есть ли какой-либо способ, которым я могу выполнить что-то подобное? Моя текущая глупая реализация выглядит так:
def collect_data(urls):
"""Non-async function wishing it was a non-async generator"""
async def stream(async_client, url, payload):
data = []
async with async_client.stream("GET", url=url) as ar:
ar.raise_for_status()
async for line in ar.aiter_lines():
data.append(line)
# would like to yield each line here
return data
async def execute_tasks(urls):
all_data = []
async with httpx.AsyncClient() as async_client:
tasks = [stream(async_client, url) for url in urls]
for coroutine in asyncio.as_completed(tasks):
all_data = await coroutine
# would like to iterate and yield each line here
return all_events
try:
loop = asyncio.get_event_loop()
data = loop.run_until_complete(execute_tasks(urls=urls))
return data
# would like to iterate and yield the data here as it becomes available
finally:
loop.close()
Я также пробовал некоторые решения, использующие каналы памяти asyncio.Queue
и trio
, но поскольку я могу читать только из тех, что находятся в области асинхронности, это не приближает меня к решению.
Причина, по которой я хочу использовать это из неасинхронного генератора, заключается в том, что я хочу использовать его из приложения Django с использованием потокового API Django Rest Framework.
Комментарии:
1. Я не уверен, что понимаю. Почему вы не можете просто вызвать вариант метода
collect_data
insidestream
? То, как вы пытаетесь сделать, т. Е. Вызвать что-то послеloop.run_until_complete
, заставляет вас сохранять все в памяти. Что, как вы правильно заметили, противоречит идее потоковой передачи. Этого невозможно избежать, посколькуloop.run_until_complete
это не генератор. Другими словами, если вы не можете изменитьcollect_data
и вызвать его внутриstream
метода, то вы мало что можете сделать.2. Почему
collect_data
должна быть синхронизация? Обычно при использовании асинхронного кода в любом месте требуется, чтобы большая часть программы была асинхронной.3. Обновил вопрос и упомянул, что я хочу представить это в приложении Django DRF, работающем синхронно. Вполне может быть, что идея сделать это из неасинхронного кода глупа, и мне нужно все переосмыслить 🙂
Ответ №1:
Обычно вы должны просто сделать collect_data
асинхронным и использовать асинхронный код повсюду — именно так был разработан asyncio для использования. Но если это по какой-то причине невозможно, вы можете выполнить итерацию асинхронного итератора вручную, применив некоторый код склеивания:
def iter_over_async(ait, loop):
ait = ait.__aiter__()
async def get_next():
try:
obj = await ait.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
Способ, которым это работает, заключается в обеспечении асинхронного замыкания, которое продолжает извлекать значения из асинхронного итератора с использованием __anext__
магического метода и возвращает объекты по мере их поступления. Это асинхронное закрытие вызывается с run_until_complete()
в цикле внутри обычного генератора синхронизации. (Закрытие фактически возвращает пару выполненного индикатора и фактического объекта, чтобы избежать StopAsyncIteration
сквозного run_until_complete
распространения, которое может быть неподдерживаемым.)
С помощью этого вы можете создать свой execute_tasks
асинхронный генератор ( async def
с yield
помощью) и выполнить итерацию по нему, используя:
for chunk in iter_over_async(execute_tasks(urls), loop):
...
Просто обратите внимание, что этот подход несовместим с asyncio.run
и может вызвать проблемы в дальнейшем.
Комментарии:
1. Это решение, похоже, работает хорошо, и спасибо за предупреждение, когда я перейду к
python>=3.7
. Я также закончил тем, что использовал aiostream.stream.merge в «задачах» для перебора всех асинхронных генераторов «одновременно».2. Хорошее использование aiostream. 🙂 Обратите внимание, что ваш код будет отлично работать с Python 3.7 и более позднимиверсиями,
loop.run_until_complete()
никуда не денется. Просто общие рекомендации смещаются в сторонуasyncio.run
, поэтому в какой-то момент ваш дизайн может отстать, и вы, возможно, захотите переосмыслить его.3. Это блестяще и именно то, что мне было нужно. Я скорректировал подпись, чтобы параметр цикла был необязательным, по умолчанию равным
asyncio.get_event_loop()
. Тогда его вызов еще более тривиален.
Ответ №2:
Просто хочу обновить решение @user4815162342 для использования asyncio.run_coroutine_threadsafe
вместо loop.run_until_complete
.
import asyncio
from typing import Any, AsyncGenerator
def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
ait = async_generator.__aiter__()
async def get_next() -> tuple[bool, Any]:
try:
obj = await ait.__anext__()
done = False
except StopAsyncIteration:
obj = None
done = True
return done, obj
while True:
done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()
if done:
break
yield obj
Я также хотел бы добавить, что я нашел подобные инструменты весьма полезными в процессе кусочного преобразования синхронного кода в код asyncio.
Ответ №3:
Существует хорошая библиотека, которая делает это (и многое другое!), называемая pypeln:
import pypeln as pl
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]