Луиджи: Задача никогда не вызывается

#python #luigi

Вопрос:

У меня есть следующая настройка

 class TaskA(luigi.Task):

   def requires(self):
      yield TaskB()
      if not get_results_from_task_B_written_on_S3():
         print('Did not find any results and will exit')
         return
      else:
         print('Found results and will proceed')
         yield TaskC()
         results = get_results_from_task_C_written_on_S3():
         
         # do other stuff


class TaskB(luigi.Task):

   def run(self): 
      // process and write results to s3

   def output(self):
      return URITarget('b_path')
    

class TaskC(luigi.Task):
   def run(self): 
      // process and write results to s3

   def output(self):
      return URITarget('c_path')
 

Журналы Luigi показывают следующее:

 Did not find any results and will exit
Found results and will proceed
 

Мне кажется, что поток управления входит в оба if и else . Поскольку это в принципе невозможно, я подозреваю, что Луиджи дважды пытается запустить трубопровод. Как только он произведет это

 Did not find any results and will exit
 

Так как он не может найти никаких результатов, записанных на s3 из TaskB .
Затем TaskB фактически завершает его выполнение. Записывает свои результаты на s3. TaskA повторы. Находит результаты TaskB на s3 и выдает

 Found results and will proceed
 

Но тогда кажется yield TaskC , что это не работает. Он просто застрял там на неопределенное время.

Это всего лишь мое предположение о поведении Луиджи. Пожалуйста, дайте мне знать, если я ошибаюсь на этот счет.

Мне нужна эта модульная разбивка задач B и C на отдельные задачи, так как это значительно упрощает тестирование. TaskC это довольно сложные задачи, настройка тестирования которых была бы намного сложнее, чем тестирование ее составляющих отдельно.

Ответ №1:

Часть проблемы заключается в том, что requires() может вызываться несколько раз во время планирования. Поэтому при первом вызове функции TaskA.requires() она выдает TaskB. Но в следующий раз, когда будет вызвана функция TaskA.requires (), вы снова выдадите TaskB и нажмете на блок else. Этот первый вызов TaskA.requires() является единственным, который используется для фактических зависимостей планирования.

Я написал тестовую программу только для того, чтобы проверить это, и вы можете видеть в моих выходных данных, сколько раз вызывается функция TaskB.output ().

 import luigi

taskC_complete = False
taskB_complete = False


def get_results_from_task_C_written_on_S3():
    return taskC_complete


def get_results_from_task_B_written_on_S3():
    return taskB_complete


def set_taskB_complete():
    taskB_complete = True


def set_taskC_complete():
    taskC_complete = True


class TaskA(luigi.Task):

    def requires(self):
        yield TaskB()
        if not get_results_from_task_B_written_on_S3():
            print('Did not find any results and will exit')
            return
        else:
            print('Found results and will proceed')
            yield TaskC()
            results = get_results_from_task_C_written_on_S3()


class TaskB(luigi.Task):

    def run(self):
        print("Task B")

    def output(self):
        return print('b_path')


class TaskC(luigi.Task):
    def run(self):
        print("Task C")

    def output(self):
        return print('c_path')


if __name__ == '__main__':
    luigi_run_results = luigi.build([TaskA()], workers=1,
                                    local_scheduler=True, detailed_summary=True, log_level='INFO')

 

Этот код выводит

 Did not find any results and will exit
b_path
Task B
Did not find any results and will exit
b_path
Did not find any results and will exit
b_path
Did not find any results and will exit
b_path
 

Хотя код не является идеальной копией того, что вы пытаетесь сделать, вот выходные данные планировщика, которые показывают, что на самом деле будет выполняться:

 INFO: Informed scheduler that task   TaskA__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TaskB__99914b932b   has status   PENDING
 

Я не уверен, чего именно вы пытаетесь достичь, но ознакомьтесь с их документацией о зависимостях задач. Вам лучше попытаться выполнить другие задачи в своей функции run() для TaskA.