#python #airflow
#python #воздушный поток
Вопрос:
Кто-нибудь может мне помочь? Я хочу set_downstream для моего списка операторов. Но они не согласованы.
for start_date, end_date in zip(start_dates_lst, end_dates_lst):
dynamic_clear_load_data_task.append(PythonOperator(
task_id=f'data_clearing_{start_date}_{end_date}_task',
op_kwargs={'table_name': USERS_TAB,
'schema': TABLEAU_SCHEMA,
'all_users_task_id': f'select_all_{start_date}_{end_date}_users_task'},
python_callable=delete_rows,
provide_context=True,
pool=POOL_LONG
))
dynamic_clear_load_data_task.append(VerticaToVerticaOperator(
task_id=f'data_loading_{start_date}_{end_date}_task',
tasks=[f'select_all_{start_date}_{end_date}_users_task'],
vertica_table=USERS_TAB,
schema=TABLEAU_SCHEMA,
action='create',
pool=POOL_LONG
))
for index, task in enumerate(dynamic_clear_load_data_task[1:]):
dynamic_clear_load_data_task[index - 1].set_downstream(task)
Но я не получаю VerticaToVerticaOperator после PythonOperator . Я получаю все VerticaToVerticaOperator и только после них я получаю весь PythonOperator.
Я хочу получить PythonOperator после даты VerticaToVerticaOperator по дате
Ответ №1:
Я не знаю почему, но все было в порядке, тогда я прокомментировал свою другую часть consistent of operators.
Ответ №2:
Смотрите https://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/helpers.html
chain(task_1, task_2, task_3, task_4)
task_1.set_downstream(task_2)
task_2.set_downstream(task_3)
task_3.set_downstream(task_4)
# also you can do
l = list(task_1, task_2, task_3, task_4)
chain(*l)