Пользовательская задача сельдерея с дополнительными параметрами

#python #python-3.x #celery

Вопрос:

В настоящее время я работаю над проектом со множеством задач сельдерея, и каждая из них имеет сложные правила повторения.

Мы настраиваем max_retries и retry_backoffice используем переменные env, и если max_retries они были превышены, то мы помещаем сообщение в «резервную очередь», например example_task.apply_async(queue=settings.EXAMPLE_FALLBACK_QUEUE, kwargs=kwargs) . Проблема в том, что мы дублируем тонны кода и путаем наши задачи с таким кодом, как:

 @app.task
def example_task(**kwargs):
    try:
         ''' The real task business '''
         ...
    except Exception as e:
         # Here we have some logs
         # Also some checks about the nature of e and how to proceed

         # some cases we run:
         example_task.retry(
            exc=e, max_retries=max_retries, countdown=countdown
         )

         # and other cases we have
         example_task.apply_async(queue=settings.example_task_fallback_queue, kwargs=kwargs)
 

Моя идея состоит в том, чтобы расширить celery.Task класс некоторыми дополнительными параметрами и сделать что-то вроде этого:

 @app.task(@app.task(ignore_result=False, bind=True, base=RetryTask(place_an_order_config))
def example_task(kwargs):
    '''Just the real business here'''
 

и реализовать такой класс, как:

 @dataclass
class RetryConfig:
    fallback_queue: str
    max_retries: int
    retry_backoff: int
    event_name: str
    traced_exceptions: List[Any]


class RetryTask(celery.Task):
    def __init__(self, config: RetryConfig):
        super().__init__()
        self.config = config

    def __call__(self, *args, **kwargs):
        try:
            self.run(*args, **kwargs)
        except Exception as e:
            if e in self.config.traced_exceptions:
                # Logs
                return

            # And here implement the retry/fallback logic
            # Using the parameterization attr `self.config` to
            # setup
 

Проблема в том, что arg base в app.task ожидает класс ( и сельдерей внутренне создает экземпляр класса), а не объект, поэтому я изменил RetryTask класс на что-то вроде этого:

 class RetryTask:  # Dont inhirit directly from celery.Task
    def __init__(self, config: RetryConfig):
        self = celery.Task()
        self.config = config

    def __call__(self, *args, **kwargs):
        try:
            self.run(*args, **kwargs)
        except Exception as e:
            if e in self.config.traced_exceptions:
                # Logs
                return

            # And here implement the retry/fallback logic
            # Using the parameterization attr `self.config` to
            # setup

 

That way I can pass a RetryTask object to app.task decorator and parametrize my task.
Unfortunately the above code is throwing TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases when the app.task function checks task = type(fun.__name__, (base,), dict(...) .

So my question is: How can I parametrize my custom RetryTask class the right way ?