#python #django #celery
#python #django #сельдерей
Вопрос:
Мне нужно запланировать некоторые задачи, которые кажутся очень сложными для параллельного выполнения. Они не зависят от результата друг друга, и функция ожидает 3 аргумента.
Я уже пробовал использовать методы chain, map и starmap. С помощью цепочки я получаю эту ошибку:
[2019-04-23 15:28:00,991: ERROR/PoolWorker-3] Task proj.apps.tasks.generate[112a7426-5ac3-4cd6-8416-5591c3c018a3] raised unexpected: TypeError('get expected at least 1 arguments, got 0',)
Traceback (most recent call last):
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
R = retval = fun(*args, **kwargs)
File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
return self.run(*args, **kwargs)
File ".../tasks.py", line 966, in generate
return res.get()
TypeError: get expected at least 1 arguments, got 0
С помощью map
я не могу передать все аргументы, и с starmap
все задачи запускаются одновременно.
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
Пример задачи:
@shared_task
def generate(field1, field2, field3=None):
if field3 is not None:
return field1 field2 field3
return field1 field2
Код с использованием цепочки:
res = chain(generate.s(i, 5, j) for i in array1 for j in array2)
return res.get()
Код с использованием starmap:
arguments = [(i, 4, j) for i in array1 for j in array2]
~generate.starmap(arguments)
Комментарии:
1. Можете ли вы указать, как вы используете задачи?
2. Конечно! Я включу код.
3. Попробуйте использовать
chain(generate.si(i, 5, j) ...)
4. Это тоже не сработало.
chain(generate.si(i, 5, j)...)
выдает ошибку того же типа, что и выше, иchain(generate.si(i, 5, j)...)()
запускает все задачи одновременно.5. Я знаю,
(generate.si(...) | generate.si(...)).apply_async()
работает, возможно, так вы пытаетесь запустить цепочку.
Ответ №1:
Все, что мне нужно было сделать, это создать цепочку, как показано ниже:
res = chain(generate(i, 2, j)for i in array1 for j in array2)()
return res.get()
а затем запустите celery с дополнительным аргументом, который устанавливает максимальное количество потоков
celery -A tasks worker --concurrency=1
Ответ №2:
если задачи действительно независимы, вы должны использовать .si
, а не .s
:
tasks = chain(generate.si(i, 5, j) for i in array1 for j in array2)
res = tasks()
return res.get()