Отслеживание хода выполнения задачи с аккордом сельдерея с помощью tqdm? (Python)

#python #celery #pro&ress #tqdm #chord

#python #сельдерей #прогресс #tqdm #аккорд

Вопрос:

Есть ли способ отслеживать ход выполнения аккорда, предпочтительно в строке tqdm?

Например, если мы возьмем пример документации, мы создадим этот файл:

 #proj/tasks.py

@app.task
def add(x, y):
    return x   y

@app.task
def tsum(numbers):
    return sum(numbers)
  

а затем запустите этот скрипт:

 from celery import chord
from proj.tasks import add, tsum

chord(add.s(i, i)
      for i in ran&e(100))(tsum.s()).&et()
  

Как мы могли бы отслеживать прогресс в аккорде?

  • Мы не можем использовать update_state, поскольку объект chord() не является функцией.
  • Мы не можем использовать collect(), поскольку chord() (обратный вызов) блокирует скрипт до тех пор, пока результаты не будут готовы.

В идеале я бы представил что-то вроде этого пользовательского подкласса tqdm для Dask, однако я не смог найти аналогичное решение.

Любая помощь или подсказка очень ценятся!

Ответ №1:

Итак, я нашел способ обойти это.

Во-первых, chord() (обратный вызов) фактически не блокирует скрипт, это делает только часть .&et(). Публикация всех задач брокеру может занять много времени. К счастью, есть простой способ отслеживать этот процесс публикации с помощью сигналов. Мы можем создать индикатор выполнения перед началом публикации и изменить обработчик примера из документации, чтобы обновить его:

 from tqdm import tqdm
from celery.si&nals import after_task_publish

publish_pbar = tqdm(total=100, desc="Publishin& tasks")

@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwar&s):
    publish_pbar.update(1)

c = chord(add.s(i, i)
      for i in ran&e(100))(tsum.s())

# The script will resume once all tasks are published so close the pbar
publish_pbar.close()
  

Однако это работает только для задач публикации, поскольку этот сигнал выполняется в сигнале, который отправил задачу. Сигнал task_success выполняется в рабочем процессе, поэтому этот трюк можно использовать только в рабочем журнале (насколько я понимаю).

Итак, чтобы отслеживать прогресс после публикации всех задач и возобновления работы скрипта, я обратился к статистике рабочих из app.control.inspect().stats(). Это возвращает dict с различной статистикой, среди которой есть выполненные задачи. Вот моя реализация:

 tasks_pbar = tqdm(total=100, desc="Executin& tasks")

previous_total = 0
current_total = 0

while current_total<100:

    current_total = 0
    for key in app.control.inspect().stats():
        current_total  = app.control.inspect().stats()[key]['total']['tasks.add']

    if current_total &&t; previous_total:
        tasks_pbar.update(current_total-previous_total)

    previous_total = current_total

results = c.&et()
tasks_pbar.close()
  

Наконец, я думаю, может потребоваться присвоить имена задачам, как для фильтрации обработчиком сигнала, так и для stats() dict, поэтому не забудьте добавить это в свои задачи:

 #proj/tasks.py

@app.task(name='tasks.add')
def add(x, y):
    return x   y
  

Если кто-то может найти лучшее решение, пожалуйста, поделитесь!