Как динамически генерировать список нижестоящих задач из задачи в потоке

#python #airflow

Вопрос:

У меня есть основная задача, содержащая логику в этой функции

Я не совсем уверен, как это сделать. Может быть, мне нужно еще одно промежуточное задание? Любая помощь приветствуется. Спасибо!

Комментарии:

1. tasks_to_run будет отправлен в xcom. вы можете извлечь список в следующей задаче. Если вы хотите создавать динамические задачи на основе этого списка — это анти-шаблон для воздушного потока.

2. У меня уже есть задачи, выполненные в этом сценарии. В принципе, у меня есть первое задание, которое проверяет, соответствуют ли все эти задачи условию. Если они это сделают, мне нужно будет запустить их. Если нет, мне нужно не запускать их. По существу, нижестоящий список выполнения должен быть динамически определен на основе результата 1-й задачи

3. В вашем коде get_campaign_active нажимает на xcom список. поэтому, если list_of_tasks выполнит запрос xcom, вы получите нужные вам значения.

4. Итак, я не могу получить возвращаемые результаты задачи get_campaign_active, если я не получу их в другой задаче через .xcom_pull?

5. Эй, @nick_rinaldi, мне было интересно, был ли вам полезен мой ответ ниже.

Ответ №1:

Если я правильно понял, у вас уже создано несколько задач, но вам нужно динамически определить, какая из них будет выполняться ниже по потоку. Если это так, вы можете безопасно использовать BranchPythonOperator:

Он выводит PythonOperator и ожидает, что функция Python вернет один идентификатор задачи или список идентификаторов задач. Возвращаемые идентификаторы задач должны указывать на задачу, расположенную непосредственно ниже по потоку от {self}. Все остальные «ветви» или непосредственно нижестоящие задачи помечены состоянием пропущено, чтобы эти пути не могли двигаться вперед. Пропущенные состояния распространяются вниз по течению, чтобы обеспечить заполнение состояния DAG и вывод состояния запуска DAG.

Рассмотрим следующее на основе example_dag, распределенного с помощью воздушного потока:

 with DAG(
    dag_id="branch_multiple_tasks",
    default_args=args,
    start_date=days_ago(1),
    schedule_interval="@daily",
    tags=["example"],
) as dag:

    run_this_first = DummyOperator(
        task_id="run_this_first",
    )

    options = ["branch_a", "branch_b", "branch_c", "branch_d"]

    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=lambda: options[1:3],
    )
    run_this_first >> branching

    join = DummyOperator(
        task_id="join",
        trigger_rule="none_failed_or_skipped",
    )

    for option in options:
        t = DummyOperator(
            task_id=option,
        )

        dummy_follow = DummyOperator(
            task_id="follow_"   option,
        )

        # Label is optional here, but it can help identify more complex branches
        branching >> Label(option) >> t >> dummy_follow >> join
 

В этом примере python_callable переданная branching задача жестко закодирована для возврата ['branch_b', 'branch_c'] .
Вы можете предоставить свой собственный вызываемый объект и вернуть список tasks_id » s » в виде строки на основе любых критериев. Вы даже можете использовать свою get_campaign_active функцию до тех пор, пока вы возвращаете ожидаемый формат. Может быть, будет чище, если вы создадите новую функцию и выполните xcom_pull ее на основе предыдущей. Я думаю, это зависит от ваших потребностей.

Представление графика:

graph_view_of_the_пример

Дай мне знать, сработало ли это для тебя!

Комментарии:

1. Эй, спасибо за ответ. Я нашел решение, и у меня должна быть куча задач, выполняемых параллельно, ответвляющихся от одной основной задачи.