Запуск задачи из другой периодической задачи с помощью сельдерея

#python #celery

#питон #сельдерей #python

Вопрос:

У меня есть периодическая задача, которая должна вызвать другую задачу. Окончательное ожидаемое поведение: первая задача должна собрать некоторые данные из внешней службы, а затем выполнить цикл по этим данным (списку) и вызвать другую задачу с передачей аргумента (текущая итерация в цикле). Я хочу, чтобы эти задачи в цикле были асинхронными.

Я написал код, который запускает задачу в period, но я не могу понять, как эта задача должна вызывать другую задачу, потому что, когда я делаю это .delay() методом, ничего не происходит.

Вот несколько упрощенных кодов, которые я хочу запустить:

 @celery_app.task(name="Hello World")
def hello_world():
    print(f"HELLO WORLD PRINT")
    add.delay(2, 2)
    return 'Hello'


@celery_app.task
def add(x, y):
    with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
        file.write(str(x y))
    print(f"x   y = {x   y}")
    return x   y
  

На данный момент hello_world() выполняется каждые 30 секунд, и в результате я получаю ПЕЧАТЬ HELLO WORLD в журналах, но задача добавления не выполняется. Я не вижу ни печати, ни файла, которые должны быть созданы этой задачей.

Обновление для комментариев, вот как я использую очередь:

 celery_app.conf.task_routes = {
    "project.app.hello_world": {
        "queue": 'test_queue'
    },
    "project.app.add": {
        "queue": 'test_queue'
    },
  

Комментарии:

1. Используете ли вы какую-то конкретную очередь вместо очереди по умолчанию?

2. Я добавил код, в котором я создаю очередь, вы можете посмотреть обновленный пост.

3. Я получил тот же результат после того, как удалил код маршрутов этой задачи.

4. Попробуйте явно отправить задачу в эту очередь с помощью .apply_async(… queue=»test_queue») вы должны дважды проверить, в какую очередь попадают ваши задачи. Вы также можете попробовать поместить queue="test_queue" аннотации в свою задачу.

Ответ №1:

Существует несколько способов решения проблемы.

Очевидный вариант — поместить имя очереди в .apply_async для примера add.apply_async(10, 10, queue="test_queue") .

Другое решение — поместить очередь в аннотацию задачи, т.Е. @celery_app.task(queue="test_queue") .

Я никогда не настраивал task_routes, но я считаю, что его можно указать там, как вы пытались…