Каков правильный способ объединить async-for с условием if для прерывания ожидания в середине?

#python #python-3.x #asynchronous #python-asyncio

#python #python-3.x #асинхронный #python-asyncio

Вопрос:

Если у меня есть сопрограмма, которая использует элементы из асинхронного генератора, каков «лучший» способ завершить этот цикл из внешнего условия?

Рассмотрим это,

 while not self.shutdown_event.is_set():
    async with self.external_lib_client as client:
        async for message in client:
            if self.shutdown_event.is_set():
                break
            await self.handle(message)
 

Если я установлю shutdown_event , он выйдет из цикла while, но не раньше, чем следующий message будет обработан async for циклом. Каков правильный способ структурировать async for итератор таким образом, чтобы он мог замыкаться, если между ним было выполнено условие, дающее результаты?

Существует ли стандартный способ добавления a Timeout ?

Комментарии:

1. Добавление if внутри тела цикла не поможет. Вам нужно будет изменить сам итератор, возможно, используя какой-то итератор-оболочку тайм-аута.

Ответ №1:

Одним из способов было бы переместить итерацию в an async def и использовать отмену:

 async def iterate(client):
    async for message in client:
        # shield() because we want cancelation to cancel retrieval
        # of the next message, not ongoing handling of a message
        await asyncio.shield(self.handle(message))

async with self.external_lib_client as client:
    iter_task = asyncio.create_task(iterate(client))
    shutdown_task = asyncio.create_task(self.shutdown_event.wait())
    await asyncio.wait([iter_task, shutdown_task],
                       return_when=asyncio.FIRST_COMPLETED)
    if iter_task.done():
        # iteration has completed, access result to propagate the
        # exception if one was raised
        iter_task.result()
        shutdown_task.cancel()
    else:
        # shutdown was requested, cancel iteration
        iter_task.cancel()
 

Другим способом было бы превратиться shutdown_event в однократный асинхронный поток и использовать aiostream для мониторинга обоих. Таким образом for , цикл получает объект при сигнале о событии завершения работы и может выйти из цикла, не потрудившись завершить ожидание следующего сообщения:

 # a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())

async with self.external_lib_client as client, 
        aiostream.stream.merge(done_stream, client).stream() as stream:
    async for message in stream:
        # the merged stream will provide a bogus value (whatever
        # `shutdown_event.wait()` returned) when the event is set,
        # so check that before using `message`:
        if self.shutdown_event.is_set():
            break
        await self.handle(message)
 

Примечание: поскольку код в вопросе недоступен для выполнения, приведенные выше примеры не проверены.

Комментарии:

1. Блестяще! aiostream не вернулся ни в одном из моих поисков. Спасибо

2. @blakev Обратите внимание, что разница между двумя вариантами заключается в том, что в первом случае __anext__ он будет отменен в случае события завершения работы. Во втором случае он просто продолжит работать в фоновом режиме, только чтобы быть отброшенным, как только это будет сделано. Первое можно считать более чистым.

3. оба решения отлично сработали, большое вам спасибо!