#python #callback #celery
Вопрос:
Пишу интеграцию сельдерея в Django, и меня смущает поведение сельдерея при связывании обратных вызовов с изменяемыми сигнатурами. Идея заключается в том, что main_task
создается объект (например, a flow
), которому он передает свой идентификатор task_one
. task_one
принимает аргумент с помощью изменяемой подписи и печатает идентификатор, созданный в main_task
.
По завершении task_one
, alt_task
называется. Пока все хорошо. Однако в alt_task
новом объекте создается и возвращается этот идентификатор. Затем я хочу, чтобы мне передали новый идентификатор для напоминания о цепочке. Происходит то, что передается идентификатор первого объекта, в то время как я хочу, чтобы он был заменен идентификатором нового объекта.
@shared_task() def alt_task(): flow = Flow.objects.create() return flow.id @shared_task() def task_one(flow_id): flow = Flow.objects.get(id=flow_id) print(flow.id) return flow.id @shared_task() def main_task(flow_id): flow = Flow.objects.create() ... task_one.apply_async( args=(flow.id,), link=(alt_task.si(), task_one.s(), ...), link_error=log_error.s() ) # prints 1 1 # what I want 1 2
Ответ №1:
Я не знаю, почему вышесказанное не передает аргументы линейно, и если кто-нибудь знает это полностью, я бы с радостью установил это как правильный ответ. В то же время, что я сделал, чтобы смягчить это, так это разделил вызов на группу сельдерея. Первой записью в группе являются задачи, которым требуется идентификатор первого объекта потока, и цепочка задач, использующих идентификатор, созданный в alt_task
... task_chain = (alt_task.si() | task_one.s() | ...) job = group([alt_task.si(flow.id), task_chain]) job.apply_async() 1 2