#celery
#сельдерей
Вопрос:
Я пытаюсь заставить сельдерей маршрутизировать задачи на основе имени задачи … в принципе, у меня есть задачи с именами ‘worker.some_name’ и ‘web.some_name’, и я использую две разные очереди, называемые worker и web соответственно. Я бы хотел, чтобы все рабочие задачи отправлялись в рабочую очередь и наоборот. В настоящее время у меня есть большой словарь CELERY_ROUTES, подобный этому:
CELERY_ROUTES = {
"web.some_name": {
"queue": "web"
},
"web.some_other_name": {
"queue": "web"
},
etc.... }
Но я хотел бы что-то более общее, например:
CELERY_ROUTES = (MyRouter(), )
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.split('.')[0] == "worker":
return {"queue": "worker"}
return {"queue": "web"}
Но, похоже, это не работает. Есть идеи? Спасибо.
Ответ №1:
Вы, должно быть, использовали декоратор «@app.task» для задачи, которую вы определили в файле py.
Вы можете маршрутизировать свою задачу с помощью @app.task(queue=’queue_name’)
Комментарии:
1. Это спасло мой день. Не будут ли маршрутизации работать при использовании этого декоратора?
2. Это может перейти в очередь по умолчанию, которую создает сельдерей с именем «сельдерей». Старайтесь избегать этого.
Ответ №2:
Вы должны быть в состоянии делать то, что хотите, изменив тип обмена с прямого на тему. Таким образом, вы можете указать задачи как web. * или worker.*
Вы можете прочитать об этом здесь: http://ask.github.com/celery/userguide/routing.html#topic-exchanges
Комментарии:
1. Эта ссылка мертва, вот хороший источник со всей информацией, упомянутой Дэном: celery.readthedocs.org/en/latest/userguide /…
2. Будет ли это работать с redis в качестве посредника сообщений? Я не могу заставить его работать с примером маршрутизации вручную, используя подстановочные знаки в celery.readthedocs.org/en/latest/userguide /…
Ответ №3:
Маршрутизация по шаблону не поддерживается в Celery 3.x по умолчанию, но вы можете реализовать ее самостоятельно.
Вот решение для копирования и вставки:
class TaskRouter:
def __init__(self, routes):
self.routes = {}
self.glob_routes = {}
for glob, queue in routes.items():
if '*' in glob:
self.glob_routes[glob] = queue
else:
self.routes[glob] = queue
def route_for_task(self, task, args=None, kwargs=None):
if task in self.routes:
return self.routes[task]
for route in self.glob_routes:
prefix = route.split('*')[0]
if task.startswith(prefix):
return self.glob_routes[route]
return None # for unknown tasks will be used default queue
Использование:
# celery.py
CELERY_ROUTES = {
'web.*': 'web',
'web.slow_task': 'slow',
'worker.*': 'worker',
}
app = Celery('config')
app.config_from_object('django.conf:settings') # Django or your app config
app.conf.update(
CELERY_ROUTES=(TaskRouter(CELERY_ROUTES),),
)
Как работает TaskRouter:
In [2]: CELERY_ROUTES = {
...: 'web.*': 'web',
...: 'web.slow_task': 'slow',
...: 'worker.*': 'worker',
...: }
In [3]: router = TaskRouter(CELERY_ROUTES)
In [4]: router.route_for_task('web.blabla')
Out[4]: 'web'
In [5]: router.route_for_task('web.slow_task')
Out[5]: 'slow'
In [6]: router.route_for_task('unknown_task') # None = default queue
In [7]: router.route_for_task('worker.foo')
Out[7]: 'worker'
In [8]: router.route_for_task('worker.bar')
Out[8]: 'worker'