Как последовательно выполнять самостоятельные задачи, используя сельдерей?

#python #django #celery

#python #django #сельдерей

Вопрос:

Мне нужно запланировать некоторые задачи, которые кажутся очень сложными для параллельного выполнения. Они не зависят от результата друг друга, и функция ожидает 3 аргумента.

Я уже пробовал использовать методы chain, map и starmap. С помощью цепочки я получаю эту ошибку:

 [2019-04-23 15:28:00,991: ERROR/PoolWorker-3] Task proj.apps.tasks.generate[112a7426-5ac3-4cd6-8416-5591c3c018a3] raised unexpected: TypeError('get expected at least 1 arguments, got 0',)
Traceback (most recent call last):
  File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
    R = retval = fun(*args, **kwargs)
  File ".../local/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
    return self.run(*args, **kwargs)
  File ".../tasks.py", line 966, in generate
    return res.get()
TypeError: get expected at least 1 arguments, got 0
  

С помощью map я не могу передать все аргументы, и с starmap все задачи запускаются одновременно.

 [2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
[2019-04-23 15:48:00,991: INFO/MainProcess] Received task: generate[..]
  

Пример задачи:

 @shared_task
def generate(field1, field2, field3=None):
   if field3 is not None:
      return field1   field2   field3
   return field1   field2
  

Код с использованием цепочки:

 res = chain(generate.s(i, 5, j) for i in array1 for j in array2)
return res.get()
  

Код с использованием starmap:

 arguments = [(i, 4, j) for i in array1 for j in array2]
~generate.starmap(arguments)
  

Комментарии:

1. Можете ли вы указать, как вы используете задачи?

2. Конечно! Я включу код.

3. Попробуйте использовать chain(generate.si(i, 5, j) ...)

4. Это тоже не сработало. chain(generate.si(i, 5, j)...) выдает ошибку того же типа, что и выше, и chain(generate.si(i, 5, j)...)() запускает все задачи одновременно.

5. Я знаю, (generate.si(...) | generate.si(...)).apply_async() работает, возможно, так вы пытаетесь запустить цепочку.

Ответ №1:

Все, что мне нужно было сделать, это создать цепочку, как показано ниже:

 res = chain(generate(i, 2, j)for i in array1 for j in array2)()
return res.get()
  

а затем запустите celery с дополнительным аргументом, который устанавливает максимальное количество потоков

 celery -A tasks worker --concurrency=1
  

Ответ №2:

если задачи действительно независимы, вы должны использовать .si , а не .s :

 tasks = chain(generate.si(i, 5, j) for i in array1 for j in array2)
res = tasks()
return res.get()