Пул асинхронных задач

#python-3.x #python-asyncio

#python-3.x #python-asyncio

Вопрос:

У меня есть пул задач в очереди, и я хотел бы выполнить задачи из результата вывода и поместить новые задачи. Это правильный способ сделать это? Я периодически проверяю, выполнена ли какая-либо задача. Создание подобной задачи task = asyncio.create_task(...) является асинхронным? (Может ли это заблокировать цикл, если мы создадим большое количество более сложных задач)?

 import asyncio
from collections import deque
from random import randint


async def show_result(q):
    while True:
        done_task = await q.get()
        result = done_task.result()
        print(result)
        q.task_done()



async def some_work(n, delay):
    await asyncio.sleep(delay)
    return f'task {n} with delay: {delay} completed'


async def tasks_worker(q, pool_size):
    """
    extract done tasks and put new
    """

    delay = get_task_param()
    tasks = deque([])

    while True:
        await asyncio.sleep(1)
        # append new tasks
        tasks_to_add = pool_size - len(tasks)
        print(f"tasks_to_add: {tasks_to_add}")
        if tasks_to_add > 0:
            # append new tasks
            for _ in range(tasks_to_add):
                n, d = await delay.__anext__()
                print(f"add task: {n} with delay: {d}")
                task = asyncio.create_task(some_work(n, d))
                tasks.append(task)

        for _ in range(len(tasks)):
            task = tasks.popleft()
            if task.done():
                await q.put(task)
            else:
                tasks.append(task)


async def get_task_param():
    task_n = 0
    while True:
        task_n  = 1
        await asyncio.sleep(0)
        yield task_n, randint(5, 10)


async def run(pool_size):
    q = asyncio.Queue()

    task_1 = asyncio.create_task(show_result(q))
    task_2 = asyncio.create_task(tasks_worker(q, pool_size))

    done, pending = await asyncio.wait({task_1, task_2}, return_when=asyncio.ALL_COMPLETED)
    print(done)
    print(pending)


if __name__ == '__main__':

    POOL_SIZE = 50

    try:
        asyncio.run(run(POOL_SIZE))
    except Exception as ex:
        print(ex)
  

Комментарии:

1. Ваш код не является идиоматическим. Предполагается, что вы используете очередь для передачи рабочих элементов от производителя к потребителю. Вам не нужно дополнительное deque , потому что очередь уже содержит deque. Не могли бы вы объяснить, что вам нужно сделать, может быть, есть лучший / более простой способ добиться того же?

2. Мне нужно периодически отправлять запросы post на 1000 серверов, но некоторые из них иногда не отвечают быстро (time_out = ~ 10 секунд). Я хотел бы получить ответы, которые уже были выполнены, и добавить новые задачи запросов. Я использую очередь, потому что я собираюсь запустить несколько разных task_workers и собрать выполненную работу в show_result(…)

3. Привет! Извините за отложенный ответ. Я попытался реализовать новую версию так же, как и ваше решение, но у меня много таймаутов. Возможно ли как-то показать вам две версии моих решений? Спасибо!

Ответ №1:

Мне нужно периодически отправлять запросы post на 1000 серверов, но некоторые из них иногда не отвечают быстро (time_out = ~ 10 секунд). Я хотел бы получить ответы, которые уже были выполнены, и добавить новые задачи запросов.

Вероятно, вам следует использовать одну очередь для назначения работы, а другую — для выдачи результатов. Вам не нужно добавлять рабочих динамически, вы можете добавлять задачи динамически, и пул рабочих фиксированного размера обрабатывает их параллельно по мере их поступления. Например:

 import asyncio
from random import randint

async def some_work(n, delay):
    await asyncio.sleep(delay)
    return f'task {n} with delay: {delay} completed'

async def worker(tasks, results):
    # individual worker task (sometimes called consumer)
    # - sequentially process tasks as they come into the queue
    # and emit the results
    while True:
        n, d = await tasks.get()
        result = await some_work(n, d)
        await results.put(result)

async def assigner(tasks):
    # come up with tasks dynamically and enqueue them for processing
    task_n = 0
    while True:
        await asyncio.sleep(1)
        task_n  = 1
        await tasks.put((task_n, randint(5, 10)))

async def displayer(q):
    # show results of the tasks as they arrive
    while True:
        result = await q.get()
        print(result)

async def main(pool_size):
    tasks = asyncio.Queue(100)
    results = asyncio.Queue(100)
    workers = [asyncio.create_task(worker(tasks, results))
               for _ in range(pool_size)]
    await asyncio.gather(assigner(tasks), displayer(results), *workers)

if __name__ == '__main__':
    POOL_SIZE = 50
    asyncio.run(main(POOL_SIZE))
  

Произвольно выбранные границы очереди из 100 элементов ограничивают максимальный размер очереди и обеспечивают противодавление в случае, если назначающий постоянно быстрее, чем рабочие, или в случае, если рабочие быстрее, чем дисплейер. Без привязки очередь в этом случае просто накапливала бы эти элементы, что фактически является утечкой памяти. С привязкой это заставит Queue.put , когда очередь заполнится, ждать, пока не появится свободный слот, прежде чем позволить ему продолжить.

Ответ №2:

asyncio.create_task планирует выполнение, поэтому не блокирует цикл событий.

Вместо добавления и добавления было бы проще, если бы задача запоминала их индексы list . В этом примере используется asyncio.Event для запуска остановки пополнения выполненных задач, и каждая coroutines из них обернута оболочкой для сохранения индексов.

 import asyncio
import random


async def wrapper(id_, queue, coroutine):
    await queue.put((id_, f'task {id_:2} completed after {await coroutine}'))


async def some_task():
    delay = random.uniform(0.5, 2)
    await asyncio.sleep(delay)
    return delay


async def stop_event_delayed(event: asyncio.Event):
    # insert stop value after 10 seconds
    await asyncio.sleep(6)
    print("Sending Stop signal!")
    event.set()


async def wait_until_val(queue: asyncio.Queue, end_val, callback):

    while (val := await queue.get()) is not end_val:
        queue.task_done()
        await callback(*val)

    queue.task_done()
    print("Received sentinel!")


def any_task_alive(task_list) -> bool:
    return any(not task.done() for task in task_list)


async def task_manager(pool_size):
    result_queue = asyncio.Queue()
    stop_event = asyncio.Event()

    # initialize pool with tasks
    pool = [asyncio.create_task(wrapper(idx, result_queue, some_task())) for idx in range(pool_size)]

    async def add_task_callback(idx, msg):
        # change task if event is not set
        if not stop_event.is_set():
            pool[idx] = asyncio.create_task(wrapper(idx, result_queue, some_task()))
        print(msg)
        # do some 'async' works here.

    # wait 10 secs before sending sentinel. Non-blocking, next line will run immediately.
    asyncio.create_task(stop_event_delayed(stop_event))

    # run consumer task to fetch and process tasks.
    consumer = asyncio.create_task(wait_until_val(result_queue, None, add_task_callback))

    # now wait until event, then put sentinel to stop replacing tasks.
    await stop_event.wait()
    await result_queue.put(None)

    # wait until sentinel is processed.
    await consumer


asyncio.run(task_manager(5))
  
 task  0 completed after 0.8840346369824601
task  4 completed after 0.8930680460954881
task  3 completed after 1.4510289596571013
task  2 completed after 1.6330140843418124
task  1 completed after 1.8027946604246798
task  0 completed after 1.015211866166258
task  3 completed after 0.8812113138160783
task  4 completed after 1.5566122527720068
task  1 completed after 1.1451157417516273
task  0 completed after 1.072688315915855
task  2 completed after 1.751008540755382
task  4 completed after 1.529157768479987
task  3 completed after 1.84274680022116
task  0 completed after 1.4290542578696022
task  2 completed after 1.486182291815456
task  1 completed after 1.9366319822607685
task  4 completed after 1.3291233078722962
task  3 completed after 1.454567007888481
Sending Stop signal!
Received sentinel!