#python #celery
#питон #сельдерей #python
Вопрос:
У меня есть периодическая задача, которая должна вызвать другую задачу. Окончательное ожидаемое поведение: первая задача должна собрать некоторые данные из внешней службы, а затем выполнить цикл по этим данным (списку) и вызвать другую задачу с передачей аргумента (текущая итерация в цикле). Я хочу, чтобы эти задачи в цикле были асинхронными.
Я написал код, который запускает задачу в period, но я не могу понять, как эта задача должна вызывать другую задачу, потому что, когда я делаю это .delay()
методом, ничего не происходит.
Вот несколько упрощенных кодов, которые я хочу запустить:
@celery_app.task(name="Hello World")
def hello_world():
print(f"HELLO WORLD PRINT")
add.delay(2, 2)
return 'Hello'
@celery_app.task
def add(x, y):
with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
file.write(str(x y))
print(f"x y = {x y}")
return x y
На данный момент hello_world()
выполняется каждые 30 секунд, и в результате я получаю ПЕЧАТЬ HELLO WORLD в журналах, но задача добавления не выполняется. Я не вижу ни печати, ни файла, которые должны быть созданы этой задачей.
Обновление для комментариев, вот как я использую очередь:
celery_app.conf.task_routes = {
"project.app.hello_world": {
"queue": 'test_queue'
},
"project.app.add": {
"queue": 'test_queue'
},
Комментарии:
1. Используете ли вы какую-то конкретную очередь вместо очереди по умолчанию?
2. Я добавил код, в котором я создаю очередь, вы можете посмотреть обновленный пост.
3. Я получил тот же результат после того, как удалил код маршрутов этой задачи.
4. Попробуйте явно отправить задачу в эту очередь с помощью .apply_async(… queue=»test_queue») вы должны дважды проверить, в какую очередь попадают ваши задачи. Вы также можете попробовать поместить
queue="test_queue"
аннотации в свою задачу.
Ответ №1:
Существует несколько способов решения проблемы.
Очевидный вариант — поместить имя очереди в .apply_async для примера add.apply_async(10, 10, queue="test_queue")
.
Другое решение — поместить очередь в аннотацию задачи, т.Е. @celery_app.task(queue="test_queue")
.
Я никогда не настраивал task_routes, но я считаю, что его можно указать там, как вы пытались…