Противоречивые аргументы обратного вызова сельдерея

#python #callback #celery

Вопрос:

Пишу интеграцию сельдерея в Django, и меня смущает поведение сельдерея при связывании обратных вызовов с изменяемыми сигнатурами. Идея заключается в том, что main_task создается объект (например, a flow ), которому он передает свой идентификатор task_one . task_one принимает аргумент с помощью изменяемой подписи и печатает идентификатор, созданный в main_task .

По завершении task_one , alt_task называется. Пока все хорошо. Однако в alt_task новом объекте создается и возвращается этот идентификатор. Затем я хочу, чтобы мне передали новый идентификатор для напоминания о цепочке. Происходит то, что передается идентификатор первого объекта, в то время как я хочу, чтобы он был заменен идентификатором нового объекта.

 @shared_task() def alt_task():  flow = Flow.objects.create()  return flow.id   @shared_task() def task_one(flow_id):  flow = Flow.objects.get(id=flow_id)  print(flow.id)  return flow.id   @shared_task() def main_task(flow_id):  flow = Flow.objects.create()  ...  task_one.apply_async(  args=(flow.id,),  link=(alt_task.si(), task_one.s(), ...),  link_error=log_error.s()  )  # prints 1 1  # what I want 1 2  

Ответ №1:

Я не знаю, почему вышесказанное не передает аргументы линейно, и если кто-нибудь знает это полностью, я бы с радостью установил это как правильный ответ. В то же время, что я сделал, чтобы смягчить это, так это разделил вызов на группу сельдерея. Первой записью в группе являются задачи, которым требуется идентификатор первого объекта потока, и цепочка задач, использующих идентификатор, созданный в alt_task

 ... task_chain = (alt_task.si() | task_one.s() | ...) job = group([alt_task.si(flow.id), task_chain]) job.apply_async()   1 2