#python #python-asyncio #aiohttp #python-3.8
#python #python-asyncio #aiohttp #python-3.8
Вопрос:
Я пишу простой клиент для взаимодействия с возможно перегруженным и ненадежным веб-сервером. Я предполагаю, что для любого отдельного запроса сервер может никогда не ответить (запрос будет тайм-аут), или может ответить с ошибкой после длительной задержки.
Из-за этого для каждого «запроса» я хочу выдавать повторные запросы в соответствии со следующей логикой:
- Повторяйте запрос (отправляйте новый запрос) каждую секунду, пока один из выданных запросов не получит ожидаемый ответ
- Новые запросы должны выдаваться одновременно с существующими запросами — существующие запросы должны оставаться открытыми до тех пор, пока они не получат ответ или тайм-аут.
- После получения ожидаемого ответа отмените (закройте) все запросы, которые все еще находятся на рассмотрении / открыты.
Я могу выполнить что-то близкое к этому, если в начале я выполняю фиксированное количество запросов одновременно, а затем использую asyncio.as_completed()
для обработки запросов по мере их завершения и отмены всех оставшихся ожидающих запросов:
import asyncio
import logging
import random
import time
from sys import stdout
class FailedRequest(Exception):
pass
async def get():
'''A simple mock async GET request that returns a randomized status after a randomized delay'''
await asyncio.sleep(random.uniform(0,10))
return random.choices([200, 500], [0.2, 0.8])[0]
async def fetch(id):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {id}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
except asyncio.CancelledError:
logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
raise
async def main():
# Create 10 unique Tasks that wrap the fetch() coroutine
tasks = [asyncio.create_task(fetch(i)) for i in range(10)]
# Iterate through the tasks as they are completed
for coro in asyncio.as_completed(tasks):
try:
# Wait for the next task to finish. If the request errored out,
# this line will raise a FailedRequest exception (caught below)
await coro
# If we get here, then a request succeeded. Cancel all of the tasks we started.
for t in tasks:
t.cancel()
except (FailedRequest, asyncio.CancelledError) as e:
pass
logging.info("Finished!")
if __name__ == '__main__':
logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
random.seed(3)
asyncio.run(main())
Вывод:
2020-09-22 18:07:35,634:INFO: Sending request 0.
2020-09-22 18:07:35,635:INFO: Sending request 1.
2020-09-22 18:07:35,635:INFO: Sending request 2.
2020-09-22 18:07:35,635:INFO: Sending request 3.
2020-09-22 18:07:35,636:INFO: Sending request 4.
2020-09-22 18:07:35,636:INFO: Sending request 5.
2020-09-22 18:07:35,636:INFO: Sending request 6.
2020-09-22 18:07:35,636:INFO: Sending request 7.
2020-09-22 18:07:35,636:INFO: Sending request 8.
2020-09-22 18:07:35,637:INFO: Sending request 9.
2020-09-22 18:07:35,786:ERROR: Request 6 failed after 0.15s: 500
2020-09-22 18:07:36,301:ERROR: Request 5 failed after 0.66s: 500
2020-09-22 18:07:37,993:ERROR: Request 9 failed after 2.35s: 500
2020-09-22 18:07:38,023:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:07:38,236:ERROR: Request 8 failed after 2.60s: 500
2020-09-22 18:07:39,351:INFO: Request 2 succeeded (200) after 3.72s!
2020-09-22 18:07:39,351:INFO: Cancelled request 1 after 3.72s.
2020-09-22 18:07:39,351:INFO: Cancelled request 3 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 4 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 7 after 3.72s.
2020-09-22 18:07:39,352:INFO: Finished!
Тем не менее, я изо всех сил пытаюсь понять простой способ начать с выдачи одного запроса, а затем отправлять дополнительные запросы каждую секунду, пока один из запросов не будет выполнен успешно, при этом отслеживая все незавершенные запросы и отменяя все, что еще находится на рассмотрении.
Это настолько близко, насколько я понял:
import asyncio
import logging
import random
import time
from sys import stdout
class FailedRequest(Exception):
pass
async def get():
'''A simple mock async GET request that returns a randomized status after a randomized delay'''
await asyncio.sleep(random.uniform(0,10))
return random.choices([200, 500], [0.2, 0.8])[0]
async def fetch(id):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {id}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
except asyncio.CancelledError:
logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
raise
async def issue_requests(finished, requests):
i = 0
while not finished.is_set():
requests.add(asyncio.create_task(fetch(i)))
await asyncio.sleep(1)
i = 1
async def handle_requests(finished, requests):
# Iterate through the requests as they are completed
for coro in asyncio.as_completed(requests):
try:
# Wait for the next task to finish. If the request errored out,
# this line will raise a FailedRequest exception (caught below)
await coro
# If we get here, then a request succeeded. Cancel all of the tasks we started.
finished.set()
for r in requests:
r.cancel()
except (FailedRequest, asyncio.CancelledError):
pass
async def main():
finished = asyncio.Event()
requests = set()
await asyncio.gather(issue_requests(finished, requests), handle_requests(finished, requests))
logging.info("Finished!")
if __name__ == '__main__':
logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
random.seed(3)
asyncio.run(main())
Однако, хотя запросы запускаются, как и ожидалось, процесс не останавливается при возврате первого успешного запроса:
2020-09-22 18:03:38,256:INFO: Sending request 0.
2020-09-22 18:03:39,264:INFO: Sending request 1.
2020-09-22 18:03:40,265:INFO: Sending request 2.
2020-09-22 18:03:40,643:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:03:41,281:INFO: Sending request 3.
2020-09-22 18:03:42,281:INFO: Sending request 4.
2020-09-22 18:03:42,948:INFO: Request 4 succeeded (200) after 0.67s!
# requests 1, 2, and 3 should be cancelled here and the script should finish
2020-09-22 18:03:43,279:INFO: Sending request 5.
2020-09-22 18:03:43,976:ERROR: Request 2 failed after 3.71s: 500
2020-09-22 18:03:44,281:INFO: Sending request 6.
2020-09-22 18:03:44,718:ERROR: Request 1 failed after 5.45s: 500
2020-09-22 18:03:45,295:INFO: Sending request 7.
2020-09-22 18:03:46,307:INFO: Sending request 8.
...
Я думаю, проблема в том, что когда asyncio.as_completed(requests)
вызывается handle_requests()
, requests
это пустой набор, поэтому as_completed()
возвращает пустой итератор и handle_requests()
немедленно возвращается.
Кажется, что это должно быть возможно сделать asyncio
на высоком уровне, но я изо всех сил пытаюсь понять это.
Ответ №1:
Вы могли бы сделать это так, например (используя while
цикл):
is_finished = False
tasks = []
def cancel_tasks():
for t in tasks:
t.cancel()
async def fetch(count):
'''Makes a request using get(), checks response, and handles cancellation'''
logging.info(f"Sending request {count}.")
start_time = time.perf_counter()
try:
response = await get()
elapsed_time = time.perf_counter() - start_time
if response != 200:
logging.error(f"Request {count} failed after {elapsed_time:.2f}s: {response}")
raise FailedRequest()
else:
global is_finished
is_finished = True
logging.info(f"Request {count} succeeded ({response}) after {elapsed_time:.2f}s!")
cancel_tasks()
except asyncio.CancelledError:
logging.info(f"Cancelled request {count} after {time.perf_counter() - start_time:.2f}s.")
raise
async def main():
count = 0
while not is_finished:
tasks.append(asyncio.create_task(fetch(count)))
await asyncio.sleep(1)
count = 1
# Wait for all tasks to cancel:
await asyncio.wait(tasks)
logging.info("Finished!")
РЕДАКТИРОВАТЬ: немного улучшено, так что он отменяет все задачи как можно скорее, а затем ожидает, пока все они будут отменены, прежде чем регистрировать «Завершено»
Комментарии:
1. Это работает, хотя установка флага finished и отмена других задач внутри
fetch()
кажется мне запахом кода. Что, еслиfetch()
нет видимости для других задач илиis_finished
?2. Другим недостатком этого подхода является то, что
tasks
список будет включать завершенныеTask
объекты. Если запросы постоянно возвращают коды ошибок иmain()
выполняются в течение длительного времени,tasks
список будет очень большим, и итерация по нему для отмены небольшого количества ожидающих выполнения задач может занять нетривиальное количество времени. Этот подход требует чистого способа удаления завершенных задач из списка, поэтомуcancel_tasks()
работает только с теми задачами, которые все еще находятся на рассмотрении.3. Я только что показал вам, как достичь того, чего вы хотите, вам решать, сделать ваш код более чистым и оптимизированным. Вместо
tasks
массива используйте словарь, в котором вы храните задачу поcount
ключу. Таким образом, вы сможете удалить его после сбоя. Другой оптимизацией было бы не запускать новый запрос, если у вас N количество ожидающих выполнения задач. Вместо определения логики для завершения в методе fetch() просто вызовитеhandle_finished
метод при успешном выполнении и в этом методе отмените все задачи и установитеis_finished
флаг наTrue
4. Согласен, это правильное решение, хотя я надеялся, что этот вопрос может превратиться в обсуждение написания идиоматического асинхронного Python. Что я действительно ищу, так это решение, которое предполагает
fetch()
, что это произвольная функция, и не требует, чтобы она вызывала определенную функцию для обработки отмены других задач, поскольку, напримерfetch()
, может быть предоставлен сторонним модулем. Представленное вами решение по-прежнему кажется мне неэлегантным, как будто должен быть лучший способ сделать это с использованием высокоуровневыхasyncio
примитивов, но я ценю ваш вклад.5. Не уверен, как вы могли предположить, что
fetch()
это произвольная функция. Вы должны написать свою собственную абстракцию, чтобы достичь того, чего вы хотите. В вашем примере я думаю, что этоget()
метод, который можно считать произвольной функцией, которая выполняет HTTP-запрос. Возможно, вас действительно беспокоит именование методов? Я бы, вероятно, обернул весь этот код в одну вызываемую функциюfetch_persistently(url)
или что-то в этом роде (в значительной степени ваш текущийmain()
), переименовав вашfetch()
вfetch_attempt()
. Так что для каждогоfetch_persistently
вызова создается отдельная переменная с ограниченной областью действияtasks
Ответ №2:
Ваши требования на первый взгляд обманчиво просты, но на самом деле требуют некоторого размышления. Если я правильно их понял, вы хотите что-то вроде этого:
async def _spawner(async_fn, done):
# spawn a new task every second, notifying the caller when
# any task completes
running = set()
def when_done(task):
running.remove(task)
done.put_nowait(task)
while True:
new_task = asyncio.create_task(async_fn())
new_task.add_done_callback(when_done)
running.add(new_task)
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
# we're canceled, cancel the tasks that are still running
for t in running:
t.cancel()
raise
async def try_until_successful(async_fn):
done = asyncio.Queue()
# run the spawner in the background so it can run
# independently of us
spawner_task = asyncio.create_task(_spawner(async_fn, done))
# collect completed tasks until the one we're happy with
while True:
task = await done.get()
if task.exception() is None:
# task didn't raise - we got our result!
spawner_task.cancel()
return task.result()
Эти функции ничего не предполагают о сопрограмме, переданной как async_fn
. Вы можете использовать lambda
или functools.partial
для создания любого вызываемого объекта, если в конечном итоге он возвращает объект сопрограммы (объект, возвращаемый простым вызовом async def
и то, что вы обычно передаете await
). Кроме того, они не используют глобальные флаги, поэтому вы можете ожидать несколько экземпляров try_until_successful
параллельно.
Для вызова fetch()
с монотонно увеличивающимися идентификаторами вы могли бы назвать это так:
async def main():
cnt = 0
async def fetch_incrementing():
nonlocal cnt
cnt = 1
await fetch(cnt)
await try_until_successful(fetch_incrementing)
Комментарии:
1. Как вы думаете, стоит ли хранить дополнительный флаг в вашем
while
цикле, где вы используетеsleep(1)
? Я имею в виду, чтобы это былоwhile not is_done:
и нетwhile True:
. Я полагаю, вы предполагаете, что как толькоdone.put_nowait(task)
вызывается, мы синхронно переходим к нашему циклу, в котором мы вызывалиtask = await done.get()
. Но верно ли это предположение? Я думаюawait done.get()
, что будет возвращать значение асинхронно, поэтому есть шанс (возможно, очень маленький), которыйsleep(1)
будет завершен до этого, и это создаст дополнительную задачу выборки. Мысли?2. @GProst Это очень интересный вопрос. Когда
when_done
срабатывает,_spawner
ожидаетsleep()
иtry_until_successful
ожидаетqueue.get()
, они являются единственнымиawait
в соответствующих сопрограммах.put_nowait
вызоветqueue.get()
пробуждение, но это произойдет на следующей итерации цикла событий, назовем ее итерацией 1. Как вы правильно указали, возможно (если маловероятно)sleep(1)
, что на итерации 1 истечет время, и в этом случае оба_spawner
иtry_until_successful
могут быть возобновлены на этой итерации, и не указано, какой из них это будет.3. Таким образом, возможно запланировать дополнительное
fetch
задание, но оно не начнет выполняться до следующей итерации, итерации 2. (Именно так работает большая часть asyncio.) И на итерации 1 возобновлениеtry_until_successful
все еще не завершено, что приведет к отмене создателя. Отмена Спавнера будет обработана на итерации 2, той же итерации, когдаfetch
также запускается new . Программа создания реагирует на отмену, отменяя все запущенные задачи (включая ненужные новыеfetch
), но эта отмена будет обработана только на итерации 3.4. Так что да, дополнительный
fetch()
запуск начнется и у него будет время, например, открыть сетевой запрос, прежде чем он будет отменен, хороший улов! Но я не уверен, что добавление условия стоит усложнять код. В настоящее время существует разделение проблем между_spawner
(который порождает новые задачи до отмены) иtry_until_successful
(который решает, что значит быть «успешным»). Мы могли бы, конечно, передать логическое значение в штучной упаковке, которое try_ … могло бы использовать, чтобы сообщить spawner о выходе, но это кажется слишком сложным для крайнего случая системы, которая предназначена для работы с максимальными усилиями.5. Если этот крайний случай когда-либо является проблемой для OP, я бы оставил добавление флага в качестве упражнения для читателя.