#python-3.x #python-asyncio
#python-3.x #python-asyncio
Вопрос:
У меня есть пул задач в очереди, и я хотел бы выполнить задачи из результата вывода и поместить новые задачи. Это правильный способ сделать это? Я периодически проверяю, выполнена ли какая-либо задача. Создание подобной задачи task = asyncio.create_task(...)
является асинхронным? (Может ли это заблокировать цикл, если мы создадим большое количество более сложных задач)?
import asyncio
from collections import deque
from random import randint
async def show_result(q):
while True:
done_task = await q.get()
result = done_task.result()
print(result)
q.task_done()
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def tasks_worker(q, pool_size):
"""
extract done tasks and put new
"""
delay = get_task_param()
tasks = deque([])
while True:
await asyncio.sleep(1)
# append new tasks
tasks_to_add = pool_size - len(tasks)
print(f"tasks_to_add: {tasks_to_add}")
if tasks_to_add > 0:
# append new tasks
for _ in range(tasks_to_add):
n, d = await delay.__anext__()
print(f"add task: {n} with delay: {d}")
task = asyncio.create_task(some_work(n, d))
tasks.append(task)
for _ in range(len(tasks)):
task = tasks.popleft()
if task.done():
await q.put(task)
else:
tasks.append(task)
async def get_task_param():
task_n = 0
while True:
task_n = 1
await asyncio.sleep(0)
yield task_n, randint(5, 10)
async def run(pool_size):
q = asyncio.Queue()
task_1 = asyncio.create_task(show_result(q))
task_2 = asyncio.create_task(tasks_worker(q, pool_size))
done, pending = await asyncio.wait({task_1, task_2}, return_when=asyncio.ALL_COMPLETED)
print(done)
print(pending)
if __name__ == '__main__':
POOL_SIZE = 50
try:
asyncio.run(run(POOL_SIZE))
except Exception as ex:
print(ex)
Комментарии:
1. Ваш код не является идиоматическим. Предполагается, что вы используете очередь для передачи рабочих элементов от производителя к потребителю. Вам не нужно дополнительное
deque
, потому что очередь уже содержит deque. Не могли бы вы объяснить, что вам нужно сделать, может быть, есть лучший / более простой способ добиться того же?2. Мне нужно периодически отправлять запросы post на 1000 серверов, но некоторые из них иногда не отвечают быстро (time_out = ~ 10 секунд). Я хотел бы получить ответы, которые уже были выполнены, и добавить новые задачи запросов. Я использую очередь, потому что я собираюсь запустить несколько разных task_workers и собрать выполненную работу в show_result(…)
3. Привет! Извините за отложенный ответ. Я попытался реализовать новую версию так же, как и ваше решение, но у меня много таймаутов. Возможно ли как-то показать вам две версии моих решений? Спасибо!
Ответ №1:
Мне нужно периодически отправлять запросы post на 1000 серверов, но некоторые из них иногда не отвечают быстро (time_out = ~ 10 секунд). Я хотел бы получить ответы, которые уже были выполнены, и добавить новые задачи запросов.
Вероятно, вам следует использовать одну очередь для назначения работы, а другую — для выдачи результатов. Вам не нужно добавлять рабочих динамически, вы можете добавлять задачи динамически, и пул рабочих фиксированного размера обрабатывает их параллельно по мере их поступления. Например:
import asyncio
from random import randint
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def worker(tasks, results):
# individual worker task (sometimes called consumer)
# - sequentially process tasks as they come into the queue
# and emit the results
while True:
n, d = await tasks.get()
result = await some_work(n, d)
await results.put(result)
async def assigner(tasks):
# come up with tasks dynamically and enqueue them for processing
task_n = 0
while True:
await asyncio.sleep(1)
task_n = 1
await tasks.put((task_n, randint(5, 10)))
async def displayer(q):
# show results of the tasks as they arrive
while True:
result = await q.get()
print(result)
async def main(pool_size):
tasks = asyncio.Queue(100)
results = asyncio.Queue(100)
workers = [asyncio.create_task(worker(tasks, results))
for _ in range(pool_size)]
await asyncio.gather(assigner(tasks), displayer(results), *workers)
if __name__ == '__main__':
POOL_SIZE = 50
asyncio.run(main(POOL_SIZE))
Произвольно выбранные границы очереди из 100 элементов ограничивают максимальный размер очереди и обеспечивают противодавление в случае, если назначающий постоянно быстрее, чем рабочие, или в случае, если рабочие быстрее, чем дисплейер. Без привязки очередь в этом случае просто накапливала бы эти элементы, что фактически является утечкой памяти. С привязкой это заставит Queue.put
, когда очередь заполнится, ждать, пока не появится свободный слот, прежде чем позволить ему продолжить.
Ответ №2:
asyncio.create_task
планирует выполнение, поэтому не блокирует цикл событий.
Вместо добавления и добавления было бы проще, если бы задача запоминала их индексы list
. В этом примере используется asyncio.Event
для запуска остановки пополнения выполненных задач, и каждая coroutines
из них обернута оболочкой для сохранения индексов.
import asyncio
import random
async def wrapper(id_, queue, coroutine):
await queue.put((id_, f'task {id_:2} completed after {await coroutine}'))
async def some_task():
delay = random.uniform(0.5, 2)
await asyncio.sleep(delay)
return delay
async def stop_event_delayed(event: asyncio.Event):
# insert stop value after 10 seconds
await asyncio.sleep(6)
print("Sending Stop signal!")
event.set()
async def wait_until_val(queue: asyncio.Queue, end_val, callback):
while (val := await queue.get()) is not end_val:
queue.task_done()
await callback(*val)
queue.task_done()
print("Received sentinel!")
def any_task_alive(task_list) -> bool:
return any(not task.done() for task in task_list)
async def task_manager(pool_size):
result_queue = asyncio.Queue()
stop_event = asyncio.Event()
# initialize pool with tasks
pool = [asyncio.create_task(wrapper(idx, result_queue, some_task())) for idx in range(pool_size)]
async def add_task_callback(idx, msg):
# change task if event is not set
if not stop_event.is_set():
pool[idx] = asyncio.create_task(wrapper(idx, result_queue, some_task()))
print(msg)
# do some 'async' works here.
# wait 10 secs before sending sentinel. Non-blocking, next line will run immediately.
asyncio.create_task(stop_event_delayed(stop_event))
# run consumer task to fetch and process tasks.
consumer = asyncio.create_task(wait_until_val(result_queue, None, add_task_callback))
# now wait until event, then put sentinel to stop replacing tasks.
await stop_event.wait()
await result_queue.put(None)
# wait until sentinel is processed.
await consumer
asyncio.run(task_manager(5))
task 0 completed after 0.8840346369824601
task 4 completed after 0.8930680460954881
task 3 completed after 1.4510289596571013
task 2 completed after 1.6330140843418124
task 1 completed after 1.8027946604246798
task 0 completed after 1.015211866166258
task 3 completed after 0.8812113138160783
task 4 completed after 1.5566122527720068
task 1 completed after 1.1451157417516273
task 0 completed after 1.072688315915855
task 2 completed after 1.751008540755382
task 4 completed after 1.529157768479987
task 3 completed after 1.84274680022116
task 0 completed after 1.4290542578696022
task 2 completed after 1.486182291815456
task 1 completed after 1.9366319822607685
task 4 completed after 1.3291233078722962
task 3 completed after 1.454567007888481
Sending Stop signal!
Received sentinel!