Выполнение задачи независимо от статуса вышестоящей задачи

#directed-acyclic-graphs #airflow

#направленные ациклические графы #поток

Вопрос:

У меня есть относительно простой DAG, определенный следующим образом:

 set_variables >> create_conf_file >> check_running_job >> transform_data >> update_variables
create_conf_file >> remove_conf_file
check_running_stat_job >> remove_conf_file
transform_openidm_data >> remove_conf_file
update_orc_file_variable >> remove_conf_file
  

Моя цель здесь — гарантировать, что remove_conf_file всегда выполняется независимо от статуса всех предыдущих задач.

Я пытался использовать trigger_rule=TriggerRule.ALL_DONE в своем PythonOperator вызове, но remove_conf_file выполняется только в том случае, если все предыдущие задачи выполнены.
Если задача check_running_stat_job завершается неудачей, remove_conf_file задача не будет выполнена.

Я хочу, чтобы файл был удален независимо от статуса вышестоящих задач: ВЫПОЛНЕНО, СБОЙ, НЕ ВЫПОЛНЕНО.

Я пробовал несколько конфигураций DAG, но, похоже, ни одна из них не работает.

[ПРАВИТЬ]
Вот представление дерева базы данных и представление базы данных в Airflow:

Стандартный вид DAG

Представление дерева базы данных

Комментарии:

1. Можете ли вы поделиться скриншотом DAG? Кажется, в вашей базе данных есть два отключенных компонента. Это правда? Кроме того, вы хотите запустить remove_conf_file , как только какая-либо из вышестоящих задач выполнена / не выполнена / пропущена?

2. @MohammedKashif Я добавил картинки. Я хочу это, remove_file когда все сработало или что-либо не удалось между create_conf_file и update_variable . или любая другая вещь, если задача не завершается сбоем или изменениями, или что-либо еще, что бы ни случилось, файл должен быть удален в конце.

3. Триггер ALL_DONE выглядит правильно настроенным. Можете ли вы проверить, достигли ли вышестоящие задачи какого-либо из состояний SUCCESS, SKIPPED, UPSTREAM_FAILED или FAILED?

4. Я хочу сказать, что если какая-либо из моих задач по какой-либо причине завершается неудачей, попробуйте наконец что-нибудь сделать, но если check_running_config_stat_job завершается неудачей, это не вызовет start_etl и не вызовет remove_conf_file

Ответ №1:

Я воспроизвел структуру задачи, отображаемую на вашем изображении, и успешно выполнил задачу remove_conf_file после того, как она spark_etl завершилась неудачей:

введите описание изображения здесь

Ключевым моментом было добавление trigger_rule='all_done' к remove_conf_file задаче. Это не означает, что все вышестоящие задачи должны быть выполнены успешно, просто они должны быть завершены (независимо от успеха или неудачи). Я использовал BashOperator вместо PythonOperator, который вы упомянули в своем вопросе.

Этот репозиторий git содержит соответствующий файл dockerfile с DAG.

Редактировать:

Для справки, вот как выглядит успешный код, если мы хотим протестировать сбой spark_etl задачи и последующее успешное выполнение remove_conf_file задачи:

 t4 = BashOperator(
    task_id='spark_etl',
    bash_command='exit 123"',          # simulation of task failure
    dag=dag)
    
t6 = BashOperator(
    task_id='remove_conf_file',
    bash_command='echo "Task 6"',
    dag=dag,
    trigger_rule='all_done')
  

(Полный код можно найти в моем репозитории git, упомянутом выше.)

Комментарии:

1. Я попробую это, но, как вы сказали, это необходимо выполнить, если задача завершается неудачей, нисходящая задача не выполняется. Но я попробую, мне любопытно.

2. Спасибо за обновление. На самом деле я имел в виду, что с trigger_rule='all_done' , remove_conf_file может быть успешно выполнено даже после spark_etl сбоя задачи. Ключевое слово «все выполнено» сбивает с толку, потому что звучит так, будто сбой или пропущенный не считается «выполненным». Однако в этом случае «выполнено» означает что-то вроде «выполнено с выполнением (или попыткой запуска) задачи» (независимо от статуса задачи).

3. Теперь я знаю, когда это не работает, как только задача завершается неудачно, DAG помечается для повторной попытки, и только последняя попытка завершается. Я почти уверен, что это не является причиной моей проблемы. Я попробую это и подтвержду, если это так, я открою проблему на Airflow github, поскольку я думаю, что это важно