#python #docker #airflow
Вопрос:
Я пытаюсь запустить свой проект python с помощью airflow 2.0.1 в docker.
Итак, вот структура каталога:
группы поддержки находятся здесь: path_to_airflow/dag/ код проекта python находится здесь: path_to_airflow/dag/utils
Я столкнулся со странным отношением воздушного потока к исключениям: моя задача PythonOperator всегда заканчивается отметкой успеха и статусом кода выхода 0, независимо от того, есть ли какое-либо исключение или нет. Может ли кто-нибудь помочь мне в решении этой проблемы?
Вот код dag:
from datetime import timedelta
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from utils.main import main
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'test',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1, 0, 0),
max_active_runs=1,
catchup=False,
)
task_dummy = DummyOperator(
task_id='task_dummy',
dag=dag,
)
task_1 = PythonOperator(
task_id='task_1',
python_callable=main,
dag=dag,
)
task_dummy >> task_1
Вот код основной функции:
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from airflow.exceptions import AirflowException
def main(data_path='/opt/airflow/dags/data/'):
...
some code
...
if condition:
raise AirflowException('empty data')
if __name__ == '__main__':
main()
Все исключения могут быть пойманы, заменив PythonOperator на BashOperator, но я просто хочу понять, что не так. Для тестирования вы можете просто оставить исключение без условий в основной функции. Спасибо, что уделили мне время.
Комментарии:
1. Ваш второй снимок экрана сделан из другой группы поддержки и задачи.
2. Я изменил названия исходного кода, просто чтобы сделать его проще, извините.
3. @JarekPotiuk Я отредактировал скриншот.
4. Откуда мне знать, что вы посмотрели на правильную dag/задачу? Я думаю, что вы, возможно, просто ошиблись и посмотрели на неправильный статус задачи/dag
5. Да, я согласен с этим. Так что я тоже был уверен. В любом случае, я воссоздал dag с другим именем с нуля, и все работало так, как должно быть. Извините за беспокойство. Может быть, я немного переутомился.
Ответ №1:
Как обсуждалось в комментарии, проблема заключалась в неправильном наблюдении.