#python #python-3.x #celery
Вопрос:
В настоящее время я работаю над проектом со множеством задач сельдерея, и каждая из них имеет сложные правила повторения.
Мы настраиваем max_retries
и retry_backoffice
используем переменные env, и если max_retries
они были превышены, то мы помещаем сообщение в «резервную очередь», например example_task.apply_async(queue=settings.EXAMPLE_FALLBACK_QUEUE, kwargs=kwargs)
. Проблема в том, что мы дублируем тонны кода и путаем наши задачи с таким кодом, как:
@app.task
def example_task(**kwargs):
try:
''' The real task business '''
...
except Exception as e:
# Here we have some logs
# Also some checks about the nature of e and how to proceed
# some cases we run:
example_task.retry(
exc=e, max_retries=max_retries, countdown=countdown
)
# and other cases we have
example_task.apply_async(queue=settings.example_task_fallback_queue, kwargs=kwargs)
Моя идея состоит в том, чтобы расширить celery.Task
класс некоторыми дополнительными параметрами и сделать что-то вроде этого:
@app.task(@app.task(ignore_result=False, bind=True, base=RetryTask(place_an_order_config))
def example_task(kwargs):
'''Just the real business here'''
и реализовать такой класс, как:
@dataclass
class RetryConfig:
fallback_queue: str
max_retries: int
retry_backoff: int
event_name: str
traced_exceptions: List[Any]
class RetryTask(celery.Task):
def __init__(self, config: RetryConfig):
super().__init__()
self.config = config
def __call__(self, *args, **kwargs):
try:
self.run(*args, **kwargs)
except Exception as e:
if e in self.config.traced_exceptions:
# Logs
return
# And here implement the retry/fallback logic
# Using the parameterization attr `self.config` to
# setup
Проблема в том, что arg base
в app.task
ожидает класс ( и сельдерей внутренне создает экземпляр класса), а не объект, поэтому я изменил RetryTask
класс на что-то вроде этого:
class RetryTask: # Dont inhirit directly from celery.Task
def __init__(self, config: RetryConfig):
self = celery.Task()
self.config = config
def __call__(self, *args, **kwargs):
try:
self.run(*args, **kwargs)
except Exception as e:
if e in self.config.traced_exceptions:
# Logs
return
# And here implement the retry/fallback logic
# Using the parameterization attr `self.config` to
# setup
That way I can pass a RetryTask
object to app.task
decorator and parametrize my task.
Unfortunately the above code is throwing TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
when the app.task
function checks task = type(fun.__name__, (base,), dict(...)
.
So my question is: How can I parametrize my custom RetryTask
class the right way ?