CELERY_ROUTES — как маршрутизировать на основе имени задачи

#celery

#сельдерей

Вопрос:

Я пытаюсь заставить сельдерей маршрутизировать задачи на основе имени задачи … в принципе, у меня есть задачи с именами ‘worker.some_name’ и ‘web.some_name’, и я использую две разные очереди, называемые worker и web соответственно. Я бы хотел, чтобы все рабочие задачи отправлялись в рабочую очередь и наоборот. В настоящее время у меня есть большой словарь CELERY_ROUTES, подобный этому:

 CELERY_ROUTES = {
    "web.some_name": {
        "queue": "web"
    },
    "web.some_other_name": {
        "queue": "web"
    },
    etc.... }
 

Но я хотел бы что-то более общее, например:

 CELERY_ROUTES = (MyRouter(), ) 
class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.split('.')[0] == "worker":
            return {"queue": "worker"}
        return {"queue": "web"}
 

Но, похоже, это не работает. Есть идеи? Спасибо.

Ответ №1:

Вы, должно быть, использовали декоратор «@app.task» для задачи, которую вы определили в файле py.

Вы можете маршрутизировать свою задачу с помощью @app.task(queue=’queue_name’)

Комментарии:

1. Это спасло мой день. Не будут ли маршрутизации работать при использовании этого декоратора?

2. Это может перейти в очередь по умолчанию, которую создает сельдерей с именем «сельдерей». Старайтесь избегать этого.

Ответ №2:

Вы должны быть в состоянии делать то, что хотите, изменив тип обмена с прямого на тему. Таким образом, вы можете указать задачи как web. * или worker.*

Вы можете прочитать об этом здесь: http://ask.github.com/celery/userguide/routing.html#topic-exchanges

Комментарии:

1. Эта ссылка мертва, вот хороший источник со всей информацией, упомянутой Дэном: celery.readthedocs.org/en/latest/userguide /…

2. Будет ли это работать с redis в качестве посредника сообщений? Я не могу заставить его работать с примером маршрутизации вручную, используя подстановочные знаки в celery.readthedocs.org/en/latest/userguide /…

Ответ №3:

Маршрутизация по шаблону не поддерживается в Celery 3.x по умолчанию, но вы можете реализовать ее самостоятельно.

Вот решение для копирования и вставки:

 class TaskRouter:
    def __init__(self, routes):
        self.routes = {}
        self.glob_routes = {}

        for glob, queue in routes.items():
            if '*' in glob:
                self.glob_routes[glob] = queue
            else:
                self.routes[glob] = queue

    def route_for_task(self, task, args=None, kwargs=None):
        if task in self.routes:
            return self.routes[task]

        for route in self.glob_routes:
            prefix = route.split('*')[0]
            if task.startswith(prefix):
                return self.glob_routes[route]

        return None # for unknown tasks will be used default queue
 

Использование:

 # celery.py
CELERY_ROUTES = {
    'web.*':            'web',
    'web.slow_task':    'slow',
    'worker.*':         'worker',
}

app = Celery('config')
app.config_from_object('django.conf:settings') # Django or your app config

app.conf.update(
    CELERY_ROUTES=(TaskRouter(CELERY_ROUTES),),
)
 

Как работает TaskRouter:

 In [2]: CELERY_ROUTES = { 
   ...:     'web.*':            'web', 
   ...:     'web.slow_task':    'slow', 
   ...:     'worker.*':         'worker', 
   ...: }                                                                       

In [3]: router = TaskRouter(CELERY_ROUTES)                                      

In [4]: router.route_for_task('web.blabla')                                     
Out[4]: 'web'

In [5]: router.route_for_task('web.slow_task')                                  
Out[5]: 'slow'

In [6]: router.route_for_task('unknown_task')  # None = default queue                                 

In [7]: router.route_for_task('worker.foo')                                     
Out[7]: 'worker'

In [8]: router.route_for_task('worker.bar')                                     
Out[8]: 'worker'