#directed-acyclic-graphs #airflow
#направленные ациклические графы #поток
Вопрос:
У меня есть относительно простой DAG, определенный следующим образом:
set_variables >> create_conf_file >> check_running_job >> transform_data >> update_variables
create_conf_file >> remove_conf_file
check_running_stat_job >> remove_conf_file
transform_openidm_data >> remove_conf_file
update_orc_file_variable >> remove_conf_file
Моя цель здесь — гарантировать, что remove_conf_file
всегда выполняется независимо от статуса всех предыдущих задач.
Я пытался использовать trigger_rule=TriggerRule.ALL_DONE
в своем PythonOperator
вызове, но remove_conf_file
выполняется только в том случае, если все предыдущие задачи выполнены.
Если задача check_running_stat_job
завершается неудачей, remove_conf_file
задача не будет выполнена.
Я хочу, чтобы файл был удален независимо от статуса вышестоящих задач: ВЫПОЛНЕНО, СБОЙ, НЕ ВЫПОЛНЕНО.
Я пробовал несколько конфигураций DAG, но, похоже, ни одна из них не работает.
[ПРАВИТЬ]
Вот представление дерева базы данных и представление базы данных в Airflow:
Комментарии:
1. Можете ли вы поделиться скриншотом DAG? Кажется, в вашей базе данных есть два отключенных компонента. Это правда? Кроме того, вы хотите запустить
remove_conf_file
, как только какая-либо из вышестоящих задач выполнена / не выполнена / пропущена?2. @MohammedKashif Я добавил картинки. Я хочу это,
remove_file
когда все сработало или что-либо не удалось междуcreate_conf_file
иupdate_variable
. или любая другая вещь, если задача не завершается сбоем или изменениями, или что-либо еще, что бы ни случилось, файл должен быть удален в конце.3. Триггер ALL_DONE выглядит правильно настроенным. Можете ли вы проверить, достигли ли вышестоящие задачи какого-либо из состояний SUCCESS, SKIPPED, UPSTREAM_FAILED или FAILED?
4. Я хочу сказать, что если какая-либо из моих задач по какой-либо причине завершается неудачей, попробуйте наконец что-нибудь сделать, но если check_running_config_stat_job завершается неудачей, это не вызовет start_etl и не вызовет remove_conf_file
Ответ №1:
Я воспроизвел структуру задачи, отображаемую на вашем изображении, и успешно выполнил задачу remove_conf_file
после того, как она spark_etl
завершилась неудачей:
Ключевым моментом было добавление trigger_rule='all_done'
к remove_conf_file
задаче. Это не означает, что все вышестоящие задачи должны быть выполнены успешно, просто они должны быть завершены (независимо от успеха или неудачи). Я использовал BashOperator вместо PythonOperator, который вы упомянули в своем вопросе.
Этот репозиторий git содержит соответствующий файл dockerfile с DAG.
Редактировать:
Для справки, вот как выглядит успешный код, если мы хотим протестировать сбой spark_etl
задачи и последующее успешное выполнение remove_conf_file
задачи:
t4 = BashOperator(
task_id='spark_etl',
bash_command='exit 123"', # simulation of task failure
dag=dag)
t6 = BashOperator(
task_id='remove_conf_file',
bash_command='echo "Task 6"',
dag=dag,
trigger_rule='all_done')
(Полный код можно найти в моем репозитории git, упомянутом выше.)
Комментарии:
1. Я попробую это, но, как вы сказали, это необходимо выполнить, если задача завершается неудачей, нисходящая задача не выполняется. Но я попробую, мне любопытно.
2. Спасибо за обновление. На самом деле я имел в виду, что с
trigger_rule='all_done'
,remove_conf_file
может быть успешно выполнено даже послеspark_etl
сбоя задачи. Ключевое слово «все выполнено» сбивает с толку, потому что звучит так, будто сбой или пропущенный не считается «выполненным». Однако в этом случае «выполнено» означает что-то вроде «выполнено с выполнением (или попыткой запуска) задачи» (независимо от статуса задачи).3. Теперь я знаю, когда это не работает, как только задача завершается неудачно, DAG помечается для повторной попытки, и только последняя попытка завершается. Я почти уверен, что это не является причиной моей проблемы. Я попробую это и подтвержду, если это так, я открою проблему на Airflow github, поскольку я думаю, что это важно