Создавайте различные DAG на основе результатов выполнения задачи в потоке воздуха

#python #airflow #directed-acyclic-graphs

Вопрос:

Допустим, у меня есть три функции:

 f1 = get_data(query):
       df = pandas.read_sql(query)
       return df

f2 = change_data(df):
      df["mean"] = df["col1"].mean()
      return df

f3 = push_new_data(data_base):
      #push some new data to the database
      return 0

 

Мне нужен один DAG, если данные из f1 него пусты, и еще один, если это не так. Если мы назовем нашу задачу f1 как t1 , f2 как t2 и f3 как t3 , то код DAG-pesudo будет выглядеть примерно так

 
df = t1(query)
if df.shape[0]>0: #some data
    t1>>t2
else:
   t1>>t3
 

но я сомневаюсь, что это правильный способ сделать это

Комментарии:

1. Я действительно не понимаю, что вы здесь делаете. Функции PythonOperator не должны возвращать df. Похоже, что ваш код является родным python, и вы пытаетесь заставить воздушный поток работать над ним, а не рефакторировать его для использования с воздушным потоком. В любом случае вы можете использовать TriggerDagRunOperator для вызова любой DAG, которая вам понадобится.

2. Я отредактировал его — последний фрагмент больше похож на какой-то псевдокод, объясняющий, чего я хочу. Если бы я мог написать так, чтобы это сработало, то мне не нужно было бы публиковать вопрос 😉

3. Проблема в том, что неясно, связана ли ваша проблема с поиском правильного способа установки зависимостей, чтобы ваш рабочий процесс выполнял/пропускал правильные задачи в соответствии с вашей логикой, или речь идет о передаче информации между задачами. Концепция получения данных в одной функции и преобразования их в другую не является простой, потому что вы не можете обмениваться данными между задачами (вы можете поделиться некоторыми метаданными, такими как статус, короткая строка и т. Д., Но не df).

4. для большей ясности. Код, который вы опубликовали (на мой взгляд), должен быть в 1 PythonOperator с возможностью вызова 1 python.