#python #airflow #directed-acyclic-graphs
Вопрос:
Допустим, у меня есть три функции:
f1 = get_data(query):
df = pandas.read_sql(query)
return df
f2 = change_data(df):
df["mean"] = df["col1"].mean()
return df
f3 = push_new_data(data_base):
#push some new data to the database
return 0
Мне нужен один DAG, если данные из f1
него пусты, и еще один, если это не так. Если мы назовем нашу задачу f1
как t1
, f2
как t2
и f3
как t3
, то код DAG-pesudo будет выглядеть примерно так
df = t1(query)
if df.shape[0]>0: #some data
t1>>t2
else:
t1>>t3
но я сомневаюсь, что это правильный способ сделать это
Комментарии:
1. Я действительно не понимаю, что вы здесь делаете. Функции PythonOperator не должны возвращать df. Похоже, что ваш код является родным python, и вы пытаетесь заставить воздушный поток работать над ним, а не рефакторировать его для использования с воздушным потоком. В любом случае вы можете использовать
TriggerDagRunOperator
для вызова любой DAG, которая вам понадобится.2. Я отредактировал его — последний фрагмент больше похож на какой-то псевдокод, объясняющий, чего я хочу. Если бы я мог написать так, чтобы это сработало, то мне не нужно было бы публиковать вопрос 😉
3. Проблема в том, что неясно, связана ли ваша проблема с поиском правильного способа установки зависимостей, чтобы ваш рабочий процесс выполнял/пропускал правильные задачи в соответствии с вашей логикой, или речь идет о передаче информации между задачами. Концепция получения данных в одной функции и преобразования их в другую не является простой, потому что вы не можете обмениваться данными между задачами (вы можете поделиться некоторыми метаданными, такими как статус, короткая строка и т. Д., Но не df).
4. для большей ясности. Код, который вы опубликовали (на мой взгляд), должен быть в 1 PythonOperator с возможностью вызова 1 python.