#airflow
#воздушный поток
Вопрос:
У меня настроен воздушный поток, и я запускаю DAG, используя следующую конфигурацию отладки vscode:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
"LC_ALL": "en_US.UTF-8",
"LANG": "en_US.UTF-8"
}
}
]
}
Он запускает файл, мои точки останова DAG defs прерываются, как и ожидалось, затем в конце файла: он выполняет dag.run()
, а затем я вечно жду, пока dag снова заполнится, и мои точки останова в функциях python_callable задач никогда не прерываются.
Какой секрет воздушного потока я не вижу?
Вот мой dag:
# scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
dag_id='download-from-s3',
start_date=days_ago(2),
catchup=False,
schedule_interval='*/1 * * * *',
is_paused_upon_creation=False
)
def new_file_detection(**context):
print("File found...") # a breakpoint here never lands
pprint(context)
init = BashOperator(
task_id='init',
bash_command='echo "My DAG initiated at $(date)"',
dag=dag,
)
file_sensor = S3KeySensor(
task_id='file_sensor',
poke_interval=10, # every 10 seconds
timeout=60,
bucket_key="s3://inbox/new/*",
bucket_name=None,
wildcard_match=True,
soft_fail=True,
dag=dag
)
file_found_message = PythonOperator(
task_id='file_found_message',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)
init >> file_sensor >> file_found_message
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run() #this triggers a backfill job
Комментарии:
1. Я немного сбит с толку вашим описанием. Вы хотите сказать, что S3KeySensor никогда не переходит в режим успеха, поэтому PythonOperator никогда не запускается?
2. @Elad — Моя точка останова в PythonOperator никогда не срабатывает. Я вижу журналы, которые указывают, что задание обратной засыпки выполняется. И задание обратной засыпки выполняется практически вечно
3. Достигает ли S3KeySensor успеха?
4. да, это происходит, когда я запускаю веб-сервер и планировщик
Ответ №1:
Это работает для меня, как и ожидалось. Я могу установить точки останова на уровне DAG или внутри определения вызываемых объектов python и просмотреть их с помощью VSCode debugger.
Я использую те же настройки отладки, которые вы предоставили, но я изменил параметр reset_dag_runs=True
на dag_run_state=State.NONE
во dag.clear()
время вызова, как указано на DebugExecutor
странице документации. Я полагаю, что это изменилось в одном из последних выпусков.
Что касается обратной засыпки, я настраиваю catchup=False
аргументы DAG (это работает в обоих направлениях). Важное примечание, я использую версию 2.0.0 Airflow.
Вот пример, использующий тот же код, example_xcomp.py
который поставляется с установкой по умолчанию:
Настройки отладки:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "internalConsole",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
}
}
]
}
Пример DAG:
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
'excom_xample',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
def push(**kwargs):
"""Pushes an XCom without a specific target"""
logging.info("log before PUSH") # <<<<<<<<<<< Before landing on breakpoint
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and
check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("PRINT Line after breakpoint ") # <<<< After landing on breakpoint
if pulled_value_1 != value_1:
raise ValueError("The two values differ"
f"{pulled_value_1} and {value_1}")
# get value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(
key=None, task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(
f'The two values differ {pulled_value_1} and {value_1}')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
push1 = PythonOperator(
task_id='push',
dag=dag,
python_callable=push,
)
push2 = PythonOperator(
task_id='push_by_returning',
dag=dag,
python_callable=push_by_returning,
)
pull = PythonOperator(
task_id='puller',
dag=dag,
python_callable=puller,
)
pull << [push1, push2]
if __name__ == '__main__':
from airflow.utils.state import State
dag.clear(dag_run_state=State.NONE)
dag.run()