Возможно получение данных генератора asyncio из цикла событий?

#python #python-asyncio #httpx

#python #python-asyncio #httpx

Вопрос:

Я хотел бы читать из нескольких одновременных запросов потоковой передачи HTTP внутри сопрограмм, используя httpx, и возвращать данные обратно в мою неасинхронную функцию, выполняющую цикл событий, а не просто возвращать окончательные данные.

Но если я заставлю свои асинхронные функции выдавать вместо возврата, я получаю жалобы, которые asyncio.as_completed() и loop.run_until_complete() ожидают сопрограмму или будущее, а не асинхронный генератор.

Итак, единственный способ заставить это работать вообще — это собирать все потоковые данные внутри каждой сопрограммы, возвращая все данные после завершения запроса. Затем соберите все результаты сопрограммы и, наконец, верните их неасинхронной вызывающей функции.

Это означает, что я должен хранить все в памяти и ждать, пока не завершится самый медленный запрос, прежде чем я получу все свои данные, что сводит на нет весь смысл потоковых http-запросов.

Есть ли какой-либо способ, которым я могу выполнить что-то подобное? Моя текущая глупая реализация выглядит так:

 def collect_data(urls):
    """Non-async function wishing it was a non-async generator"""

    async def stream(async_client, url, payload):
        data = []
        async with async_client.stream("GET", url=url) as ar:
            ar.raise_for_status()
            async for line in ar.aiter_lines():
                data.append(line)
                # would like to yield each line here
        return data

    async def execute_tasks(urls):
        all_data = []
        async with httpx.AsyncClient() as async_client:
            tasks = [stream(async_client, url) for url in urls]
            for coroutine in asyncio.as_completed(tasks):
                all_data  = await coroutine
                # would like to iterate and yield each line here
        return all_events

    try:
        loop = asyncio.get_event_loop()
        data = loop.run_until_complete(execute_tasks(urls=urls))
        return data
        # would like to iterate and yield the data here as it becomes available
    finally:
        loop.close()
  

Я также пробовал некоторые решения, использующие каналы памяти asyncio.Queue и trio , но поскольку я могу читать только из тех, что находятся в области асинхронности, это не приближает меня к решению.

Причина, по которой я хочу использовать это из неасинхронного генератора, заключается в том, что я хочу использовать его из приложения Django с использованием потокового API Django Rest Framework.

Комментарии:

1. Я не уверен, что понимаю. Почему вы не можете просто вызвать вариант метода collect_data inside stream ? То, как вы пытаетесь сделать, т. Е. Вызвать что-то после loop.run_until_complete , заставляет вас сохранять все в памяти. Что, как вы правильно заметили, противоречит идее потоковой передачи. Этого невозможно избежать, поскольку loop.run_until_complete это не генератор. Другими словами, если вы не можете изменить collect_data и вызвать его внутри stream метода, то вы мало что можете сделать.

2. Почему collect_data должна быть синхронизация? Обычно при использовании асинхронного кода в любом месте требуется, чтобы большая часть программы была асинхронной.

3. Обновил вопрос и упомянул, что я хочу представить это в приложении Django DRF, работающем синхронно. Вполне может быть, что идея сделать это из неасинхронного кода глупа, и мне нужно все переосмыслить 🙂

Ответ №1:

Обычно вы должны просто сделать collect_data асинхронным и использовать асинхронный код повсюду — именно так был разработан asyncio для использования. Но если это по какой-то причине невозможно, вы можете выполнить итерацию асинхронного итератора вручную, применив некоторый код склеивания:

 def iter_over_async(ait, loop):
    ait = ait.__aiter__()
    async def get_next():
        try:
            obj = await ait.__anext__()
            return False, obj
        except StopAsyncIteration:
            return True, None
    while True:
        done, obj = loop.run_until_complete(get_next())
        if done:
            break
        yield obj
  

Способ, которым это работает, заключается в обеспечении асинхронного замыкания, которое продолжает извлекать значения из асинхронного итератора с использованием __anext__ магического метода и возвращает объекты по мере их поступления. Это асинхронное закрытие вызывается с run_until_complete() в цикле внутри обычного генератора синхронизации. (Закрытие фактически возвращает пару выполненного индикатора и фактического объекта, чтобы избежать StopAsyncIteration сквозного run_until_complete распространения, которое может быть неподдерживаемым.)

С помощью этого вы можете создать свой execute_tasks асинхронный генератор ( async def с yield помощью) и выполнить итерацию по нему, используя:

 for chunk in iter_over_async(execute_tasks(urls), loop):
    ...
  

Просто обратите внимание, что этот подход несовместим с asyncio.run и может вызвать проблемы в дальнейшем.

Комментарии:

1. Это решение, похоже, работает хорошо, и спасибо за предупреждение, когда я перейду к python>=3.7 . Я также закончил тем, что использовал aiostream.stream.merge в «задачах» для перебора всех асинхронных генераторов «одновременно».

2. Хорошее использование aiostream. 🙂 Обратите внимание, что ваш код будет отлично работать с Python 3.7 и более позднимиверсиями, loop.run_until_complete() никуда не денется. Просто общие рекомендации смещаются в сторону asyncio.run , поэтому в какой-то момент ваш дизайн может отстать, и вы, возможно, захотите переосмыслить его.

3. Это блестяще и именно то, что мне было нужно. Я скорректировал подпись, чтобы параметр цикла был необязательным, по умолчанию равным asyncio.get_event_loop() . Тогда его вызов еще более тривиален.

Ответ №2:

Просто хочу обновить решение @user4815162342 для использования asyncio.run_coroutine_threadsafe вместо loop.run_until_complete .

 import asyncio
from typing import Any, AsyncGenerator

def _iter_over_async(loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator):
    ait = async_generator.__aiter__()

    async def get_next() -> tuple[bool, Any]:
        try:
            obj = await ait.__anext__()
            done = False

        except StopAsyncIteration:
            obj = None
            done = True

        return done, obj

    while True:
        done, obj = asyncio.run_coroutine_threadsafe(get_next(), loop).result()

        if done:
            break

        yield obj
  

Я также хотел бы добавить, что я нашел подобные инструменты весьма полезными в процессе кусочного преобразования синхронного кода в код asyncio.

Ответ №3:

Существует хорошая библиотека, которая делает это (и многое другое!), называемая pypeln:

 import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x   1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]