Задача PythonOperator воздушного потока не терпит неудачи, несмотря ни на что

#python #docker #airflow

Вопрос:

Я пытаюсь запустить свой проект python с помощью airflow 2.0.1 в docker.

Итак, вот структура каталога:

группы поддержки находятся здесь: path_to_airflow/dag/ код проекта python находится здесь: path_to_airflow/dag/utils

Я столкнулся со странным отношением воздушного потока к исключениям: моя задача PythonOperator всегда заканчивается отметкой успеха и статусом кода выхода 0, независимо от того, есть ли какое-либо исключение или нет. Может ли кто-нибудь помочь мне в решении этой проблемы?

Вот код dag:

 from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator

from utils.main import main

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'test',
    default_args=default_args,
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1, 0, 0),
    max_active_runs=1,
    catchup=False,
)

task_dummy = DummyOperator(
    task_id='task_dummy',
    dag=dag,
)

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=main,
    dag=dag,
)

task_dummy >> task_1
 

Вот код основной функции:

 import os
import sys

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

from airflow.exceptions import AirflowException

def main(data_path='/opt/airflow/dags/data/'):

    ...
    some code
    ...
    
    if condition:
        raise AirflowException('empty data')

if __name__ == '__main__':
    main()
 

Вот журнал дага:
введите описание изображения здесь

Вот окончательный статус dag: введите описание изображения здесь

Все исключения могут быть пойманы, заменив PythonOperator на BashOperator, но я просто хочу понять, что не так. Для тестирования вы можете просто оставить исключение без условий в основной функции. Спасибо, что уделили мне время.

Комментарии:

1. Ваш второй снимок экрана сделан из другой группы поддержки и задачи.

2. Я изменил названия исходного кода, просто чтобы сделать его проще, извините.

3. @JarekPotiuk Я отредактировал скриншот.

4. Откуда мне знать, что вы посмотрели на правильную dag/задачу? Я думаю, что вы, возможно, просто ошиблись и посмотрели на неправильный статус задачи/dag

5. Да, я согласен с этим. Так что я тоже был уверен. В любом случае, я воссоздал dag с другим именем с нуля, и все работало так, как должно быть. Извините за беспокойство. Может быть, я немного переутомился.

Ответ №1:

Как обсуждалось в комментарии, проблема заключалась в неправильном наблюдении.