Воздушный поток: как set_dowstream для списка операторов

#python #airflow

#python #воздушный поток

Вопрос:

Кто-нибудь может мне помочь? Я хочу set_downstream для моего списка операторов. Но они не согласованы.

 for start_date, end_date in zip(start_dates_lst, end_dates_lst):
        dynamic_clear_load_data_task.append(PythonOperator(
            task_id=f'data_clearing_{start_date}_{end_date}_task',
            op_kwargs={'table_name': USERS_TAB,
                       'schema': TABLEAU_SCHEMA,
                       'all_users_task_id': f'select_all_{start_date}_{end_date}_users_task'},
            python_callable=delete_rows,
            provide_context=True,
            pool=POOL_LONG
        ))
        dynamic_clear_load_data_task.append(VerticaToVerticaOperator(
            task_id=f'data_loading_{start_date}_{end_date}_task',
            tasks=[f'select_all_{start_date}_{end_date}_users_task'],
            vertica_table=USERS_TAB,
            schema=TABLEAU_SCHEMA,
            action='create',
            pool=POOL_LONG
        ))
for index, task in enumerate(dynamic_clear_load_data_task[1:]):
    dynamic_clear_load_data_task[index - 1].set_downstream(task)
  

Но я не получаю VerticaToVerticaOperator после PythonOperator . Я получаю все VerticaToVerticaOperator и только после них я получаю весь PythonOperator.
Я хочу получить PythonOperator после даты VerticaToVerticaOperator по дате

Ответ №1:

Я не знаю почему, но все было в порядке, тогда я прокомментировал свою другую часть consistent of operators.

Ответ №2:

Смотрите https://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/helpers.html

 chain(task_1, task_2, task_3, task_4)
task_1.set_downstream(task_2)
task_2.set_downstream(task_3)
task_3.set_downstream(task_4)
# also you can do
l = list(task_1, task_2, task_3, task_4)
chain(*l)