Добавление очереди сельдерея на этапе начальной загрузки

#python #celery

#python #сельдерей

Вопрос:

Я хочу сделать две вещи

  1. Я сделал рабочий загрузочный шаг, который загружает конфигурацию с удаленного.
  2. Я хочу получить параметр из конфигурации и добавить очередь, указав его в имени очереди потребителя.

app.steps[‘worker’].add(LoadConfig)

работает идеально

но я не могу выполнить шаг загрузки SetQueue

просто мой SetQueue теперь выглядит так:

 class SetQueue(bootsteps.StartStopStep):

    requires = (Consumer, )

    def start(self, parent, **kwargs):

        parent.add_task_queue('q_name', exchange='q_name', routing_key='q_name')

app.steps['consumer'].add(SetQueue)
 

это не работает.

Я думаю, моя проблема в том, что я не понимаю — в какой момент (требуется = (???, )) можно добавлять очереди.

Ответ №1:

В сельдерее существуют разные «схемы», где каждая схема состоит из нескольких шагов начальной загрузки.

Первым элементом для запуска является рабочий, который, например, запускает пул выполнения, последним элементом для запуска является Consumer , который подключается к брокеру и запускает, среди прочего, потребителя задачи.

Таким образом, ваш requires не указывает на этап начальной загрузки, он указывает на схему.

Я вижу, что ваш шаг начальной загрузки добавляет еще одну очередь для использования, поэтому было бы разумнее, чтобы он зависел от Tasks шага начальной загрузки, который должен быть запущен, если вы хотите вызвать add_task_queue .

 class SetQueue(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Tasks', )
 

Вы можете увидеть обзор компонентов запуска здесь:
http://docs.celeryproject.org/en/latest/userguide/extending.html#blueprints

Кроме того, зарегистрировав пользовательские шаги начальной загрузки, вы можете сгенерировать новую версию этого графика с помощью:

 $ celery -A proj graph bootsteps | dot -Tpng -o steps.png