#python #celery #pro&ress #tqdm #chord
#python #сельдерей #прогресс #tqdm #аккорд
Вопрос:
Есть ли способ отслеживать ход выполнения аккорда, предпочтительно в строке tqdm?
Например, если мы возьмем пример документации, мы создадим этот файл:
#proj/tasks.py
@app.task
def add(x, y):
return x y
@app.task
def tsum(numbers):
return sum(numbers)
а затем запустите этот скрипт:
from celery import chord
from proj.tasks import add, tsum
chord(add.s(i, i)
for i in ran&e(100))(tsum.s()).&et()
Как мы могли бы отслеживать прогресс в аккорде?
- Мы не можем использовать update_state, поскольку объект chord() не является функцией.
- Мы не можем использовать collect(), поскольку chord() (обратный вызов) блокирует скрипт до тех пор, пока результаты не будут готовы.
В идеале я бы представил что-то вроде этого пользовательского подкласса tqdm для Dask, однако я не смог найти аналогичное решение.
Любая помощь или подсказка очень ценятся!
Ответ №1:
Итак, я нашел способ обойти это.
Во-первых, chord() (обратный вызов) фактически не блокирует скрипт, это делает только часть .&et(). Публикация всех задач брокеру может занять много времени. К счастью, есть простой способ отслеживать этот процесс публикации с помощью сигналов. Мы можем создать индикатор выполнения перед началом публикации и изменить обработчик примера из документации, чтобы обновить его:
from tqdm import tqdm
from celery.si&nals import after_task_publish
publish_pbar = tqdm(total=100, desc="Publishin& tasks")
@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwar&s):
publish_pbar.update(1)
c = chord(add.s(i, i)
for i in ran&e(100))(tsum.s())
# The script will resume once all tasks are published so close the pbar
publish_pbar.close()
Однако это работает только для задач публикации, поскольку этот сигнал выполняется в сигнале, который отправил задачу. Сигнал task_success выполняется в рабочем процессе, поэтому этот трюк можно использовать только в рабочем журнале (насколько я понимаю).
Итак, чтобы отслеживать прогресс после публикации всех задач и возобновления работы скрипта, я обратился к статистике рабочих из app.control.inspect().stats(). Это возвращает dict с различной статистикой, среди которой есть выполненные задачи. Вот моя реализация:
tasks_pbar = tqdm(total=100, desc="Executin& tasks")
previous_total = 0
current_total = 0
while current_total<100:
current_total = 0
for key in app.control.inspect().stats():
current_total = app.control.inspect().stats()[key]['total']['tasks.add']
if current_total &&t; previous_total:
tasks_pbar.update(current_total-previous_total)
previous_total = current_total
results = c.&et()
tasks_pbar.close()
Наконец, я думаю, может потребоваться присвоить имена задачам, как для фильтрации обработчиком сигнала, так и для stats() dict, поэтому не забудьте добавить это в свои задачи:
#proj/tasks.py
@app.task(name='tasks.add')
def add(x, y):
return x y
Если кто-то может найти лучшее решение, пожалуйста, поделитесь!