#python #luigi
Вопрос:
У меня есть следующая настройка
class TaskA(luigi.Task):
def requires(self):
yield TaskB()
if not get_results_from_task_B_written_on_S3():
print('Did not find any results and will exit')
return
else:
print('Found results and will proceed')
yield TaskC()
results = get_results_from_task_C_written_on_S3():
# do other stuff
class TaskB(luigi.Task):
def run(self):
// process and write results to s3
def output(self):
return URITarget('b_path')
class TaskC(luigi.Task):
def run(self):
// process and write results to s3
def output(self):
return URITarget('c_path')
Журналы Luigi показывают следующее:
Did not find any results and will exit
Found results and will proceed
Мне кажется, что поток управления входит в оба if
и else
. Поскольку это в принципе невозможно, я подозреваю, что Луиджи дважды пытается запустить трубопровод. Как только он произведет это
Did not find any results and will exit
Так как он не может найти никаких результатов, записанных на s3 из TaskB
.
Затем TaskB
фактически завершает его выполнение. Записывает свои результаты на s3. TaskA
повторы. Находит результаты TaskB
на s3 и выдает
Found results and will proceed
Но тогда кажется yield
TaskC
, что это не работает. Он просто застрял там на неопределенное время.
Это всего лишь мое предположение о поведении Луиджи. Пожалуйста, дайте мне знать, если я ошибаюсь на этот счет.
Мне нужна эта модульная разбивка задач B и C на отдельные задачи, так как это значительно упрощает тестирование. TaskC
это довольно сложные задачи, настройка тестирования которых была бы намного сложнее, чем тестирование ее составляющих отдельно.
Ответ №1:
Часть проблемы заключается в том, что requires() может вызываться несколько раз во время планирования. Поэтому при первом вызове функции TaskA.requires() она выдает TaskB. Но в следующий раз, когда будет вызвана функция TaskA.requires (), вы снова выдадите TaskB и нажмете на блок else. Этот первый вызов TaskA.requires() является единственным, который используется для фактических зависимостей планирования.
Я написал тестовую программу только для того, чтобы проверить это, и вы можете видеть в моих выходных данных, сколько раз вызывается функция TaskB.output ().
import luigi
taskC_complete = False
taskB_complete = False
def get_results_from_task_C_written_on_S3():
return taskC_complete
def get_results_from_task_B_written_on_S3():
return taskB_complete
def set_taskB_complete():
taskB_complete = True
def set_taskC_complete():
taskC_complete = True
class TaskA(luigi.Task):
def requires(self):
yield TaskB()
if not get_results_from_task_B_written_on_S3():
print('Did not find any results and will exit')
return
else:
print('Found results and will proceed')
yield TaskC()
results = get_results_from_task_C_written_on_S3()
class TaskB(luigi.Task):
def run(self):
print("Task B")
def output(self):
return print('b_path')
class TaskC(luigi.Task):
def run(self):
print("Task C")
def output(self):
return print('c_path')
if __name__ == '__main__':
luigi_run_results = luigi.build([TaskA()], workers=1,
local_scheduler=True, detailed_summary=True, log_level='INFO')
Этот код выводит
Did not find any results and will exit
b_path
Task B
Did not find any results and will exit
b_path
Did not find any results and will exit
b_path
Did not find any results and will exit
b_path
Хотя код не является идеальной копией того, что вы пытаетесь сделать, вот выходные данные планировщика, которые показывают, что на самом деле будет выполняться:
INFO: Informed scheduler that task TaskA__99914b932b has status PENDING
INFO: Informed scheduler that task TaskB__99914b932b has status PENDING
Я не уверен, чего именно вы пытаетесь достичь, но ознакомьтесь с их документацией о зависимостях задач. Вам лучше попытаться выполнить другие задачи в своей функции run() для TaskA.