asyncio.wait_for_completion() в динамическом списке задач

#python #python-asyncio #aiohttp #python-3.8

#python #python-asyncio #aiohttp #python-3.8

Вопрос:

Я пишу простой клиент для взаимодействия с возможно перегруженным и ненадежным веб-сервером. Я предполагаю, что для любого отдельного запроса сервер может никогда не ответить (запрос будет тайм-аут), или может ответить с ошибкой после длительной задержки.

Из-за этого для каждого «запроса» я хочу выдавать повторные запросы в соответствии со следующей логикой:

  • Повторяйте запрос (отправляйте новый запрос) каждую секунду, пока один из выданных запросов не получит ожидаемый ответ
  • Новые запросы должны выдаваться одновременно с существующими запросами — существующие запросы должны оставаться открытыми до тех пор, пока они не получат ответ или тайм-аут.
  • После получения ожидаемого ответа отмените (закройте) все запросы, которые все еще находятся на рассмотрении / открыты.

Я могу выполнить что-то близкое к этому, если в начале я выполняю фиксированное количество запросов одновременно, а затем использую asyncio.as_completed() для обработки запросов по мере их завершения и отмены всех оставшихся ожидающих запросов:

 import asyncio
import logging
import random
import time
from sys import stdout


class FailedRequest(Exception):
    pass

async def get():
    '''A simple mock async GET request that returns a randomized status after a randomized delay'''
    await asyncio.sleep(random.uniform(0,10))
    return random.choices([200, 500], [0.2, 0.8])[0]

async def fetch(id):
    '''Makes a request using get(), checks response, and handles cancellation'''
    logging.info(f"Sending request {id}.")
    start_time = time.perf_counter()
    try:
        response = await get()
        elapsed_time = time.perf_counter() - start_time
            
        if response != 200:
            logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
            raise FailedRequest()
        else:
            logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
    except asyncio.CancelledError:
        logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
        raise

async def main():
    # Create 10 unique Tasks that wrap the fetch() coroutine
    tasks = [asyncio.create_task(fetch(i)) for i in range(10)]

    # Iterate through the tasks as they are completed
    for coro in asyncio.as_completed(tasks):
        try:
            # Wait for the next task to finish. If the request errored out,
            # this line will raise a FailedRequest exception (caught below)
            await coro

            # If we get here, then a request succeeded. Cancel all of the tasks we started.
            for t in tasks:
                t.cancel()

        except (FailedRequest, asyncio.CancelledError) as e:
            pass

    logging.info("Finished!")
                
if __name__ == '__main__':
    logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
    random.seed(3)
    asyncio.run(main())
  

Вывод:

 2020-09-22 18:07:35,634:INFO: Sending request 0.
2020-09-22 18:07:35,635:INFO: Sending request 1.
2020-09-22 18:07:35,635:INFO: Sending request 2.
2020-09-22 18:07:35,635:INFO: Sending request 3.
2020-09-22 18:07:35,636:INFO: Sending request 4.
2020-09-22 18:07:35,636:INFO: Sending request 5.
2020-09-22 18:07:35,636:INFO: Sending request 6.
2020-09-22 18:07:35,636:INFO: Sending request 7.
2020-09-22 18:07:35,636:INFO: Sending request 8.
2020-09-22 18:07:35,637:INFO: Sending request 9.
2020-09-22 18:07:35,786:ERROR: Request 6 failed after 0.15s: 500
2020-09-22 18:07:36,301:ERROR: Request 5 failed after 0.66s: 500
2020-09-22 18:07:37,993:ERROR: Request 9 failed after 2.35s: 500
2020-09-22 18:07:38,023:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:07:38,236:ERROR: Request 8 failed after 2.60s: 500
2020-09-22 18:07:39,351:INFO: Request 2 succeeded (200) after 3.72s!
2020-09-22 18:07:39,351:INFO: Cancelled request 1 after 3.72s.
2020-09-22 18:07:39,351:INFO: Cancelled request 3 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 4 after 3.72s.
2020-09-22 18:07:39,352:INFO: Cancelled request 7 after 3.72s.
2020-09-22 18:07:39,352:INFO: Finished!
  

Тем не менее, я изо всех сил пытаюсь понять простой способ начать с выдачи одного запроса, а затем отправлять дополнительные запросы каждую секунду, пока один из запросов не будет выполнен успешно, при этом отслеживая все незавершенные запросы и отменяя все, что еще находится на рассмотрении.

Это настолько близко, насколько я понял:

 import asyncio
import logging
import random
import time
from sys import stdout


class FailedRequest(Exception):
    pass

async def get():
    '''A simple mock async GET request that returns a randomized status after a randomized delay'''
    await asyncio.sleep(random.uniform(0,10))
    return random.choices([200, 500], [0.2, 0.8])[0]

async def fetch(id):
    '''Makes a request using get(), checks response, and handles cancellation'''
    logging.info(f"Sending request {id}.")
    start_time = time.perf_counter()
    try:
        response = await get()
        elapsed_time = time.perf_counter() - start_time
            
        if response != 200:
            logging.error(f"Request {id} failed after {elapsed_time:.2f}s: {response}")
            raise FailedRequest()
        else:
            logging.info(f"Request {id} succeeded ({response}) after {elapsed_time:.2f}s!")
    except asyncio.CancelledError:
        logging.info(f"Cancelled request {id} after {time.perf_counter() - start_time:.2f}s.")
        raise

async def issue_requests(finished, requests):
    i = 0
    while not finished.is_set():
        requests.add(asyncio.create_task(fetch(i)))
        await asyncio.sleep(1)
        i  = 1

async def handle_requests(finished, requests):
    # Iterate through the requests as they are completed
    for coro in asyncio.as_completed(requests):
        try:
            # Wait for the next task to finish. If the request errored out,
            # this line will raise a FailedRequest exception (caught below)
            await coro

            # If we get here, then a request succeeded. Cancel all of the tasks we started.
            finished.set()
            for r in requests:
                r.cancel()

        except (FailedRequest, asyncio.CancelledError):
            pass


async def main():
    finished = asyncio.Event()
    requests = set()

    await asyncio.gather(issue_requests(finished, requests), handle_requests(finished, requests))
    logging.info("Finished!")
                
if __name__ == '__main__':
    logging.basicConfig(stream=stdout, level=logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
    random.seed(3)
    asyncio.run(main())
  

Однако, хотя запросы запускаются, как и ожидалось, процесс не останавливается при возврате первого успешного запроса:

 2020-09-22 18:03:38,256:INFO: Sending request 0.
2020-09-22 18:03:39,264:INFO: Sending request 1.
2020-09-22 18:03:40,265:INFO: Sending request 2.
2020-09-22 18:03:40,643:ERROR: Request 0 failed after 2.39s: 500
2020-09-22 18:03:41,281:INFO: Sending request 3.
2020-09-22 18:03:42,281:INFO: Sending request 4.
2020-09-22 18:03:42,948:INFO: Request 4 succeeded (200) after 0.67s!
# requests 1, 2, and 3 should be cancelled here and the script should finish
2020-09-22 18:03:43,279:INFO: Sending request 5.
2020-09-22 18:03:43,976:ERROR: Request 2 failed after 3.71s: 500
2020-09-22 18:03:44,281:INFO: Sending request 6.
2020-09-22 18:03:44,718:ERROR: Request 1 failed after 5.45s: 500
2020-09-22 18:03:45,295:INFO: Sending request 7.
2020-09-22 18:03:46,307:INFO: Sending request 8.
...
  

Я думаю, проблема в том, что когда asyncio.as_completed(requests) вызывается handle_requests() , requests это пустой набор, поэтому as_completed() возвращает пустой итератор и handle_requests() немедленно возвращается.

Кажется, что это должно быть возможно сделать asyncio на высоком уровне, но я изо всех сил пытаюсь понять это.

Ответ №1:

Вы могли бы сделать это так, например (используя while цикл):

 is_finished = False
tasks = []

def cancel_tasks():
  for t in tasks:
    t.cancel()

async def fetch(count):
  '''Makes a request using get(), checks response, and handles cancellation'''
  logging.info(f"Sending request {count}.")
  start_time = time.perf_counter()
  try:
    response = await get()
    elapsed_time = time.perf_counter() - start_time

    if response != 200:
      logging.error(f"Request {count} failed after {elapsed_time:.2f}s: {response}")
      raise FailedRequest()
    else:
      global is_finished
      is_finished = True
      logging.info(f"Request {count} succeeded ({response}) after {elapsed_time:.2f}s!")
      cancel_tasks()

  except asyncio.CancelledError:
    logging.info(f"Cancelled request {count} after {time.perf_counter() - start_time:.2f}s.")
    raise

async def main():
  count = 0
  while not is_finished:
    tasks.append(asyncio.create_task(fetch(count)))
    await asyncio.sleep(1)
    count  = 1

  # Wait for all tasks to cancel:
  await asyncio.wait(tasks)

  logging.info("Finished!")
  

РЕДАКТИРОВАТЬ: немного улучшено, так что он отменяет все задачи как можно скорее, а затем ожидает, пока все они будут отменены, прежде чем регистрировать «Завершено»

Комментарии:

1. Это работает, хотя установка флага finished и отмена других задач внутри fetch() кажется мне запахом кода. Что, если fetch() нет видимости для других задач или is_finished ?

2. Другим недостатком этого подхода является то, что tasks список будет включать завершенные Task объекты. Если запросы постоянно возвращают коды ошибок и main() выполняются в течение длительного времени, tasks список будет очень большим, и итерация по нему для отмены небольшого количества ожидающих выполнения задач может занять нетривиальное количество времени. Этот подход требует чистого способа удаления завершенных задач из списка, поэтому cancel_tasks() работает только с теми задачами, которые все еще находятся на рассмотрении.

3. Я только что показал вам, как достичь того, чего вы хотите, вам решать, сделать ваш код более чистым и оптимизированным. Вместо tasks массива используйте словарь, в котором вы храните задачу по count ключу. Таким образом, вы сможете удалить его после сбоя. Другой оптимизацией было бы не запускать новый запрос, если у вас N количество ожидающих выполнения задач. Вместо определения логики для завершения в методе fetch() просто вызовите handle_finished метод при успешном выполнении и в этом методе отмените все задачи и установите is_finished флаг на True

4. Согласен, это правильное решение, хотя я надеялся, что этот вопрос может превратиться в обсуждение написания идиоматического асинхронного Python. Что я действительно ищу, так это решение, которое предполагает fetch() , что это произвольная функция, и не требует, чтобы она вызывала определенную функцию для обработки отмены других задач, поскольку, например fetch() , может быть предоставлен сторонним модулем. Представленное вами решение по-прежнему кажется мне неэлегантным, как будто должен быть лучший способ сделать это с использованием высокоуровневых asyncio примитивов, но я ценю ваш вклад.

5. Не уверен, как вы могли предположить, что fetch() это произвольная функция. Вы должны написать свою собственную абстракцию, чтобы достичь того, чего вы хотите. В вашем примере я думаю, что это get() метод, который можно считать произвольной функцией, которая выполняет HTTP-запрос. Возможно, вас действительно беспокоит именование методов? Я бы, вероятно, обернул весь этот код в одну вызываемую функцию fetch_persistently(url) или что-то в этом роде (в значительной степени ваш текущий main() ), переименовав ваш fetch() в fetch_attempt() . Так что для каждого fetch_persistently вызова создается отдельная переменная с ограниченной областью действия tasks

Ответ №2:

Ваши требования на первый взгляд обманчиво просты, но на самом деле требуют некоторого размышления. Если я правильно их понял, вы хотите что-то вроде этого:

 async def _spawner(async_fn, done):
    # spawn a new task every second, notifying the caller when
    # any task completes
    running = set()
    def when_done(task):
        running.remove(task)
        done.put_nowait(task)

    while True:
        new_task = asyncio.create_task(async_fn())
        new_task.add_done_callback(when_done)
        running.add(new_task)
        try:
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            # we're canceled, cancel the tasks that are still running
            for t in running:
                t.cancel()
            raise

async def try_until_successful(async_fn):
    done = asyncio.Queue()
    # run the spawner in the background so it can run
    # independently of us
    spawner_task = asyncio.create_task(_spawner(async_fn, done))

    # collect completed tasks until the one we're happy with
    while True:
        task = await done.get()
        if task.exception() is None:
            # task didn't raise - we got our result!
            spawner_task.cancel()
            return task.result()
  

Эти функции ничего не предполагают о сопрограмме, переданной как async_fn . Вы можете использовать lambda или functools.partial для создания любого вызываемого объекта, если в конечном итоге он возвращает объект сопрограммы (объект, возвращаемый простым вызовом async def и то, что вы обычно передаете await ). Кроме того, они не используют глобальные флаги, поэтому вы можете ожидать несколько экземпляров try_until_successful параллельно.

Для вызова fetch() с монотонно увеличивающимися идентификаторами вы могли бы назвать это так:

 async def main():
    cnt = 0
    async def fetch_incrementing():
        nonlocal cnt
        cnt  = 1
        await fetch(cnt)
    await try_until_successful(fetch_incrementing)
  

Комментарии:

1. Как вы думаете, стоит ли хранить дополнительный флаг в вашем while цикле, где вы используете sleep(1) ? Я имею в виду, чтобы это было while not is_done: и нет while True: . Я полагаю, вы предполагаете, что как только done.put_nowait(task) вызывается, мы синхронно переходим к нашему циклу, в котором мы вызывали task = await done.get() . Но верно ли это предположение? Я думаю await done.get() , что будет возвращать значение асинхронно, поэтому есть шанс (возможно, очень маленький), который sleep(1) будет завершен до этого, и это создаст дополнительную задачу выборки. Мысли?

2. @GProst Это очень интересный вопрос. Когда when_done срабатывает, _spawner ожидает sleep() и try_until_successful ожидает queue.get() , они являются единственными await в соответствующих сопрограммах. put_nowait вызовет queue.get() пробуждение, но это произойдет на следующей итерации цикла событий, назовем ее итерацией 1. Как вы правильно указали, возможно (если маловероятно) sleep(1) , что на итерации 1 истечет время, и в этом случае оба _spawner и try_until_successful могут быть возобновлены на этой итерации, и не указано, какой из них это будет.

3. Таким образом, возможно запланировать дополнительное fetch задание, но оно не начнет выполняться до следующей итерации, итерации 2. (Именно так работает большая часть asyncio.) И на итерации 1 возобновление try_until_successful все еще не завершено, что приведет к отмене создателя. Отмена Спавнера будет обработана на итерации 2, той же итерации, когда fetch также запускается new . Программа создания реагирует на отмену, отменяя все запущенные задачи (включая ненужные новые fetch ), но эта отмена будет обработана только на итерации 3.

4. Так что да, дополнительный fetch() запуск начнется и у него будет время, например, открыть сетевой запрос, прежде чем он будет отменен, хороший улов! Но я не уверен, что добавление условия стоит усложнять код. В настоящее время существует разделение проблем между _spawner (который порождает новые задачи до отмены) и try_until_successful (который решает, что значит быть «успешным»). Мы могли бы, конечно, передать логическое значение в штучной упаковке, которое try_ … могло бы использовать, чтобы сообщить spawner о выходе, но это кажется слишком сложным для крайнего случая системы, которая предназначена для работы с максимальными усилиями.

5. Если этот крайний случай когда-либо является проблемой для OP, я бы оставил добавление флага в качестве упражнения для читателя.