#python #pipeline #luigi
#python #конвейер #luigi
Вопрос:
Я начинаю использовать Луиджи. Я построил конвейер, который выполняет несколько задач, и я был достаточно осторожен, чтобы убедиться, что задачи работают хорошо. Итак, конвейер работает хорошо. Во время построения конвейера, когда в задачах возникали сбои, о них сообщали, :(
и я редактировал их, пока они не работали нормально.
Итак, допустим, у меня есть конвейер, который выполняет
Task1-->Task2--> Task3
В этом случае, если Task2 завершается с ошибкой, Task3 не выполняется, и конвейер останавливается на этом. Обычно из-за ошибки при написании Задачы2.
Теперь представьте, что есть 5 «Задач1», 5 «Задач2» и одна «Задач3». Итак, Task3 — это своего рода итоговая задача.
Я бы хотел, чтобы мой конвейер не останавливался при возникновении сбоя, а пропускал (и, возможно, регистрировал сбой) и переходил к следующему обращению. (Эти «сбои» будут вызваны не тем, что задача плохо написана, а тем, что, скажем, данные, которые вводятся в эту задачу, повреждены в реальном сценарии)
Что-то вроде
Там вы можете видеть, что Task1 выполняется перед Task2. Задачи, отмеченные красным, являются «сбоями».
Итак, что я хотел бы, чтобы конвейер выполнял задачи 1 и задачи 2, регистрировал сбои, продолжал и, наконец, подводил итоги с помощью Task3 (даже включая какой-то отчет о том, что были сбои)
Как я могу сделать это с помощью Луиджи?
Ответ №1:
Я думаю, вам нужно будет добавить оператор try / except к вашим необязательным задачам, чтобы при сбое они все равно генерировали фиктивный вывод, и Луиджи думает, что все в порядке.
Ответ №2:
В Luigi у вас есть события. Я предполагаю, что вы можете использовать событие сбоя и управлять ошибкой там. Вы можете сгенерировать файл, который будет работать как контрольная точка для неудачной задачи, и записать сообщение, которое вы предпочитаете.
Официальная документация: https://luigi.readthedocs.io/en/stable/tasks.html#events-and-callbacks
@luigi.contrib.hadoop.JobTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any JobTask subclass
"""
...