Отслеживать прогресс участников аккорда

#celery #celery-task

Вопрос:

Есть ли способ получить task_id s всех участников аккорда? Моя цель-отслеживать ход выполнения каждой задачи аккордов. Вот моя попытка до сих пор:

 import random
import time
from typing import List

import celery
from celery import Celery, chord
from celery.utils.log import get_task_logger

app = Celery(
    "chaintasks",
    backend="redis://localhost:6379",
    broker="pyamqp://guest@localhost//",
)
app.conf.update(task_track_started=True, result_persistent=True)


@app.task(bind=True)
def process_item(self: celery.Task, item: str) -> str:
    t = random.randint(0, 10)
    for i in range(t):
        chord_id = self.request.chord["options"]["task_id"]
        self.update_state(
            meta={"progress": i / t},  # How do I access this progress value?
        )
        self.update_state(      # This way I write the group_id in the main task info,
            task_id=chord_id,   # but maybe I don't even need this?
            meta={"group_id": self.request.group},
            state="STARTED"
        )
        time.sleep(1)
    return item   "_processed"


@app.task()
def group_items(items: List[str]) -> str:
    return ":".join(items)


@app.task
def process_and_group(items: List[str]):
    task = chord((process_item.s(i) for i in items), group_items.s())
    res = task.delay()
    return res.id


random.seed(42)

if __name__ == "__main__":
    all_items = ["item1", "item2", "item3", "item4"]
    res1 = process_and_group.delay(all_items)
    while True:
        subtask_id = res1.result
        if subtask_id is None:
            print("initial", res1.state)
        else:
            res2 = app.AsyncResult(subtask_id)
            print("subtask state", res2.state)
            info = res2.info
            print("subtask info", res2.info)
            if isinstance(res2.info, dict):
                group_id = info.get("group_id")
                if group_id is not None:
                    group_res = app.GroupResult(group_id)
                    # Ideally here, I would like to be able to track the progress all the chords tasks,
                    # something like an average of all progresses would be OK.
                    print("group results", group_res.results)  # group_res.results is None, unfortunately
            if res2.state == "SUCCESS":
                break
        time.sleep(0.5)
    print("subtask result", res2.result)
 

Как мне получить что-то из этого self.request.group UUID? Есть ли способ отслеживать ход выполнения подзадач аккордов? Я открыт для любых модификаций этого MWE, чтобы заставить его работать.