Отладка воздушного потока: как пропустить выполнение задания обратной засыпки при запуске DAG в vscode

#airflow

#воздушный поток

Вопрос:

У меня настроен воздушный поток, и я запускаю DAG, используя следующую конфигурацию отладки vscode:

 {
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal",
            "justMyCode": false,
            "env":{
                "AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
                "AIRFLOW__DEBUG__FAIL_FAST": "True",
                "LC_ALL": "en_US.UTF-8",
                "LANG": "en_US.UTF-8"
            }
        }
    ]
}
 

Он запускает файл, мои точки останова DAG defs прерываются, как и ожидалось, затем в конце файла: он выполняет dag.run() , а затем я вечно жду, пока dag снова заполнится, и мои точки останова в функциях python_callable задач никогда не прерываются.

Какой секрет воздушного потока я не вижу?

Вот мой dag:

 # scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
    dag_id='download-from-s3',
    start_date=days_ago(2),
    catchup=False,
    schedule_interval='*/1 * * * *',
    is_paused_upon_creation=False
)

def new_file_detection(**context):
 print("File found...") # a breakpoint here never lands
 pprint(context)

init = BashOperator(
    task_id='init',
    bash_command='echo "My DAG initiated at $(date)"',
    dag=dag,
)
 
file_sensor = S3KeySensor(
    task_id='file_sensor',
    poke_interval=10, # every 10 seconds
    timeout=60, 
    bucket_key="s3://inbox/new/*",
    bucket_name=None,
    wildcard_match=True,
    soft_fail=True,
    dag=dag
)

file_found_message = PythonOperator(
    task_id='file_found_message',
    provide_context=True,
    python_callable=new_file_detection,
    dag=dag
 )
 
init >> file_sensor >> file_found_message

if __name__ == '__main__':
    dag.clear(reset_dag_runs=True)
    dag.run() #this triggers a backfill job
 

Комментарии:

1. Я немного сбит с толку вашим описанием. Вы хотите сказать, что S3KeySensor никогда не переходит в режим успеха, поэтому PythonOperator никогда не запускается?

2. @Elad — Моя точка останова в PythonOperator никогда не срабатывает. Я вижу журналы, которые указывают, что задание обратной засыпки выполняется. И задание обратной засыпки выполняется практически вечно

3. Достигает ли S3KeySensor успеха?

4. да, это происходит, когда я запускаю веб-сервер и планировщик

Ответ №1:

Это работает для меня, как и ожидалось. Я могу установить точки останова на уровне DAG или внутри определения вызываемых объектов python и просмотреть их с помощью VSCode debugger.

Я использую те же настройки отладки, которые вы предоставили, но я изменил параметр reset_dag_runs=True на dag_run_state=State.NONE во dag.clear() время вызова, как указано на DebugExecutor странице документации. Я полагаю, что это изменилось в одном из последних выпусков.

Что касается обратной засыпки, я настраиваю catchup=False аргументы DAG (это работает в обоих направлениях). Важное примечание, я использую версию 2.0.0 Airflow.

Вот пример, использующий тот же код, example_xcomp.py который поставляется с установкой по умолчанию:

Настройки отладки:

 {
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "internalConsole",
            "justMyCode": false,
            "env":{
                "AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
                "AIRFLOW__DEBUG__FAIL_FAST": "True",
            }
        }
    ]
}
 

Пример DAG:

 import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'excom_xample',
    schedule_interval="@once",
    start_date=days_ago(2),
    default_args={'owner': 'airflow'},
    tags=['example'],
    catchup=False
)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    logging.info("log before PUSH")  # <<<<<<<<<<< Before landing on breakpoint
    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and
        check if the pushed values match the pulled values."""
    ti = kwargs['ti']

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("PRINT Line after breakpoint ")  # <<<< After landing on breakpoint
    if pulled_value_1 != value_1:
        raise ValueError("The two values differ"
                         f"{pulled_value_1} and {value_1}")

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
    if pulled_value_2 != value_2:
        raise ValueError(
            f'The two values differ {pulled_value_2} and {value_2}')

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=['push', 'push_by_returning'])
    if pulled_value_1 != value_1:
        raise ValueError(
            f'The two values differ {pulled_value_1} and {value_1}')
    if pulled_value_2 != value_2:
        raise ValueError(
            f'The two values differ {pulled_value_2} and {value_2}')


push1 = PythonOperator(
    task_id='push',
    dag=dag,
    python_callable=push,
)

push2 = PythonOperator(
    task_id='push_by_returning',
    dag=dag,
    python_callable=push_by_returning,
)

pull = PythonOperator(
    task_id='puller',
    dag=dag,
    python_callable=puller,
)

pull << [push1, push2]

if __name__ == '__main__':
    from airflow.utils.state import State
    dag.clear(dag_run_state=State.NONE)
    dag.run()