#celery #celery-task
Вопрос:
Есть ли способ получить task_id
s всех участников аккорда? Моя цель-отслеживать ход выполнения каждой задачи аккордов. Вот моя попытка до сих пор:
import random
import time
from typing import List
import celery
from celery import Celery, chord
from celery.utils.log import get_task_logger
app = Celery(
"chaintasks",
backend="redis://localhost:6379",
broker="pyamqp://guest@localhost//",
)
app.conf.update(task_track_started=True, result_persistent=True)
@app.task(bind=True)
def process_item(self: celery.Task, item: str) -> str:
t = random.randint(0, 10)
for i in range(t):
chord_id = self.request.chord["options"]["task_id"]
self.update_state(
meta={"progress": i / t}, # How do I access this progress value?
)
self.update_state( # This way I write the group_id in the main task info,
task_id=chord_id, # but maybe I don't even need this?
meta={"group_id": self.request.group},
state="STARTED"
)
time.sleep(1)
return item "_processed"
@app.task()
def group_items(items: List[str]) -> str:
return ":".join(items)
@app.task
def process_and_group(items: List[str]):
task = chord((process_item.s(i) for i in items), group_items.s())
res = task.delay()
return res.id
random.seed(42)
if __name__ == "__main__":
all_items = ["item1", "item2", "item3", "item4"]
res1 = process_and_group.delay(all_items)
while True:
subtask_id = res1.result
if subtask_id is None:
print("initial", res1.state)
else:
res2 = app.AsyncResult(subtask_id)
print("subtask state", res2.state)
info = res2.info
print("subtask info", res2.info)
if isinstance(res2.info, dict):
group_id = info.get("group_id")
if group_id is not None:
group_res = app.GroupResult(group_id)
# Ideally here, I would like to be able to track the progress all the chords tasks,
# something like an average of all progresses would be OK.
print("group results", group_res.results) # group_res.results is None, unfortunately
if res2.state == "SUCCESS":
break
time.sleep(0.5)
print("subtask result", res2.result)
Как мне получить что-то из этого self.request.group
UUID? Есть ли способ отслеживать ход выполнения подзадач аккордов? Я открыт для любых модификаций этого MWE, чтобы заставить его работать.