Оператор ветки пропускает поток воздуха

#python #airflow

Вопрос:

У меня есть этот поток:-

 execution_date_hour = "{{ execution_date.strftime('%H') }}"

default_args = {
    'owner': 'hourly-airflow',
    'depends_on_past': False,
    'catch_up': False,
    'start_date': days_ago(1),
    'email': failure_email_list,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('hourly_pipeline_dag',
          default_args=default_args,
          tags=['hourly'],
          schedule_interval='@hourly',
          catchup=False)

taskA = PostgresOperator(dag=dag,
                         task_id='taskA', 
                         postgres_conn_id='database_connection',
                         sql='sql/hourly_entry.sql')

taskb = DummyOperator(
    dag=dag,
    task_id="taskb"
)

taske = DummyOperator(
    dag=dag,
    task_id="taske"
)

taskc = DummyOperator(
    dag=dag,
    task_id="taskc"
)

taskd = DummyOperator(
    dag=dag,
    task_id="taskd"
)

branch_op = BranchPythonOperator(
    task_id='branch_op',
    python_callable=lambda
        **kwargs: 'feed_sensor_a' if execution_date_hour == '5' else 'feed_sensor_b',
    dag=dag)

feed_sensor_a = SqlSensor(dag=dag,
                          task_id='feed_sensor_a',
                          conn_id='database_connection',
                          sql='sql/sensor_hourly.sql',
                          poke_interval=30,
                          trigger_rule=TriggerRule.ONE_SUCCESS,
                          timeout=3600)

feed_sensor_b = SqlSensor(dag=dag,
                          task_id='feed_sensor_b',
                          conn_id='database_connection',
                          sql='sql/sensor.sql',
                          poke_interval=30,
                          trigger_rule=TriggerRule.ONE_SUCCESS,
                          timeout=3600)

taskA >> [taskb,taskc]
taskb >> taskd
taskc >> taske
[taskd,taske] >> branch_op
branch_op >> [feed_sensor_a,feed_sensor_b] 
 

Конвейер выполняется до тех пор, пока taskd и taske, branch_op не будут пропущены. Пожалуйста, помогите, я застрял на этом так надолго. До taske и taskd все работает нормально, branch_op выделен красным, т. е. пропущен, не знаю, что здесь происходит. (Все эти задачи являются фиктивными задачами, на самом деле они являются HttpOperator и Postgres op).
Заранее спасибо, дайте мне знать, если потребуется какая-либо другая информация.

Ответ №1:

Запустив ваш код, я не вижу branch_op , чтобы задача была выполнена неудачно или пропущена. Однако я не думаю, что ваша BranchPythonOperator задача будет работать так, как вам хотелось бы. В лямбда-функцию не передаются входные данные, и python_callable она не является шаблонным полем для оператора (т. Е. Логика вычисляется в литеральной строке "{{ execution_date.strftime('%H') }}" , поэтому поток всегда будет следовать feed_sensor_b . Попробуйте это вместо этого:

 branch_op = BranchPythonOperator(
    task_id="branch_op",
    python_callable=lambda execution_date_hour: "feed_sensor_a" if execution_date_hour == "5" else "feed_sensor_b",
    op_args=[execution_date_hour],
    dag=dag,
)
 

Комментарии:

1. Спасибо, что уделили мне время, это сработало. Можете ли вы помочь мне с еще одним сомнением, например, дата выполнения 2021-06-03 08:00:00 для запуска dag, но поскольку запуск dag выполняется, дата выполнения меняется, а затем, когда я снова обращусь к нему, он вернет ту же дату выполнения, что и 08 или 09.

2. Я получаю доступ к дате выполнения в виде макроса {{ execution_date }}

3. @ArpitPruthi execution_date Входящий поток воздуха-это не фактическая дата/время запуска, а скорее отметка времени начала периода его расписания. Это означает execution_date , что для одного и того же запуска DAG не должно изменяться при повторном запуске и не будет изменяться при выполнении DAG. Совершенно новый экземпляр запуска DAG изменит execution_date , так как это приведет к другому периоду расписания. Вам нужна более динамичная метка времени для вашей логики, чтобы метка времени менялась на реальное время настенных часов?

4. Спасибо, Джош понял твою точку зрения… Можете ли вы помочь мне с {{ execution_date.subtract(часы = 5) }}, здесь вместо жесткого кодирования 5 я хочу передать его как переменную, я не могу найти способ передать переменную и, наконец, использовать значение переменной.

5. @ArpitPruthi Вы думали о передаче этого значения в качестве переменной воздушного потока, в качестве вывода из какой-либо функции или каким-либо другим способом?

Ответ №2:

Я не могу воспроизвести сбой в задании branch_op . Однако, даже если бы он был запущен , он всегда был в else состоянии, потому BranchPythonOperator что автоматически не отображается execution_date в списке полей шаблона. Поэтому я сделал две вещи,

  • Немного переформатировал DAG
  • Обновленное условие для проверки 05 , потому %H что переходит в этот формат
  • Использовал аргумент op_kwargs BranchPythonOperator , чтобы скоротать час.
 from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta

execution_date_hour = "{{ execution_date.strftime('%H') }}"
failure_email_list = ['myemail']

default_args = {
    'owner': 'hourly-airflow',
    'depends_on_past': False,
    'catch_up': False,
    'start_date': datetime(2020, 12, 14, 0, 0),
    'email': failure_email_list,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
}

dag = DAG('hourly_pipeline_dag',
          default_args=default_args,
          tags=['hourly'],
          schedule_interval='@hourly',
          catchup=False)

with dag:
    taskA = DummyOperator(task_id='taskA')
    taskB = DummyOperator(task_id='taskB')
    taskC = DummyOperator(task_id='taskC')
    taskD = DummyOperator(task_id='taskD')
    taskE = DummyOperator(task_id='taskE')

    branch_op = BranchPythonOperator(
        task_id= f'branch_op',
        python_callable=lambda hour: 'feed_sensor_5' if hour == "05" else 'feed_sensor_not5',
        op_kwargs={'hour': execution_date_hour},
    )
    feed_sensor_5 = DummyOperator(task_id='feed_sensor_5')
    feed_sensor_not5 = DummyOperator(task_id='feed_sensor_not5')

    taskA >> [taskB,taskC]
    taskB >> taskD
    taskC >> taskE
    [taskD,taskE] >> branch_op
    branch_op >> [feed_sensor_5,feed_sensor_not5] 
 

Теперь он переходит в if для выполнения 2021-05-01 05:00:00 и в else для 2021-05-01 06:00:00

Комментарии:

1. Спасибо, что это сработало для меня. Можете ли вы помочь мне с еще одним сомнением, например, дата выполнения 2021-06-03 08:00:00 для запуска dag, но поскольку запуск dag выполняется, дата выполнения меняется, а затем, когда я снова обращусь к нему, он вернет ту же дату выполнения, что и 08 или 09.

2. Я получаю доступ к дате выполнения в виде макроса {{ execution_date }}