как Луиджи обрабатывает сбой задачи?

#python #pipeline #luigi

#python #конвейер #luigi

Вопрос:

Я начинаю использовать Луиджи. Я построил конвейер, который выполняет несколько задач, и я был достаточно осторожен, чтобы убедиться, что задачи работают хорошо. Итак, конвейер работает хорошо. Во время построения конвейера, когда в задачах возникали сбои, о них сообщали, :( и я редактировал их, пока они не работали нормально.

Итак, допустим, у меня есть конвейер, который выполняет

 Task1-->Task2--> Task3
  

В этом случае, если Task2 завершается с ошибкой, Task3 не выполняется, и конвейер останавливается на этом. Обычно из-за ошибки при написании Задачы2.

Теперь представьте, что есть 5 «Задач1», 5 «Задач2» и одна «Задач3». Итак, Task3 — это своего рода итоговая задача.

Я бы хотел, чтобы мой конвейер не останавливался при возникновении сбоя, а пропускал (и, возможно, регистрировал сбой) и переходил к следующему обращению. (Эти «сбои» будут вызваны не тем, что задача плохо написана, а тем, что, скажем, данные, которые вводятся в эту задачу, повреждены в реальном сценарии)

Что-то вроде

Конвейер со сбоями

Там вы можете видеть, что Task1 выполняется перед Task2. Задачи, отмеченные красным, являются «сбоями».

Итак, что я хотел бы, чтобы конвейер выполнял задачи 1 и задачи 2, регистрировал сбои, продолжал и, наконец, подводил итоги с помощью Task3 (даже включая какой-то отчет о том, что были сбои)

Как я могу сделать это с помощью Луиджи?

Ответ №1:

Я думаю, вам нужно будет добавить оператор try / except к вашим необязательным задачам, чтобы при сбое они все равно генерировали фиктивный вывод, и Луиджи думает, что все в порядке.

Ответ №2:

В Luigi у вас есть события. Я предполагаю, что вы можете использовать событие сбоя и управлять ошибкой там. Вы можете сгенерировать файл, который будет работать как контрольная точка для неудачной задачи, и записать сообщение, которое вы предпочитаете.

Официальная документация: https://luigi.readthedocs.io/en/stable/tasks.html#events-and-callbacks

 @luigi.contrib.hadoop.JobTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
       of `run` on any JobTask subclass
    """
    ...