Как получить расчетное время, необходимое для выполнения задачи по сельдерею?

#python #rabbitmq #celery #fastapi

Вопрос:

Я пытаюсь получить расчетное время, необходимое для выполнения задачи с сельдереем. Например, если задача A запускается на интерфейсе пользователем (возможно, нажатием кнопки, которая запускает вызов REST API на серверную часть), он отобразит таймер, показывающий, сколько времени потребуется для завершения задачи.

Если я правильно прочитал документацию Сельдерея, я могу получить ETA для запланированных задач. Но как насчет текущей задачи, которая только что была выполнена?

Первоначальная идея, которая у меня была, состояла в том, чтобы сохранить время выполнения аналогичных задач, которые выполнялись ранее, в базе данных и, вероятно, сделать среднее значение, которое действует как расчетное необходимое время. Было бы здорово, если бы кто-нибудь мог поделиться некоторым опытом в приведенном выше примере, если бы он/она это сделал. Большое спасибо!

Ответ №1:

Способ 1: Используйте декоратор времени

 import time
import functools


def timer(func):
    @functools.wraps(func)
    def _wrapper(*args, **kwargs):
        start = time.perf_counter()
        val = func(*args, **kwargs)
        end = time.perf_counter()
        elapsed_time = end - start
        task_info = {"time": elapsed_time, task_args: args, task_kwargs: kwargs}
        # do something with this task_info, i.e log or save in db to be used for analysis later
        return val

    return _wrapper


@app.task(name="task1")
@timer
def task1(...):
  ...
 

Способ 2: Используйте сигналы сельдерея

 import time
from celery.signals import task_prerun, task_postrun

tasks = {}


@task_prerun.connect
def task_prerun_handler(task_id, task, **extras):
    """ Dispatched before a task is executed. """
    tasks[task_id] = time.perf_counter()


@task_postrun.connect
def task_postrun_handler(task_id, task, **extras):
    """ Dispatched after a task has been executed. """
    end = time.perf_counter()
    elapsed_time = end - tasks.pop(task_id)
    # do something with this task_info, i.e log or save in db to be used for analysis later
 

Ресурсы, на которые вы можете посмотреть:

Комментарии:

1. Спасибо за рекомендацию @Rupsi ! Но я предполагаю, что расчетное время будет основываться на исторических прогонах предыдущих задач, а не на оценке в реальном времени, верно? Это означает, что есть ли способ, при котором работник может просто указать расчетное время выполнения, не основываясь на прошлых запусках, а просто исключительно на ресурсах, необходимых для выполнения этой конкретной задачи?