Простой способ выполнения задач, определенных как DAG, в python?

# #python #flask #airflow #google-cloud-run #directed-acyclic-graphs

Вопрос:

Я выполняю ряд задач, которые сложным образом зависят друг от друга. Я хотел бы описать эти зависимости как DAG (направленный ациклический граф) и выполнить график, когда это необходимо.

Я смотрел на воздушный поток и написал фиктивный сценарий:

 from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


def cloud_runner():
    # my typical usage here would be a http call to a service (e.g. gcp cloudrun)
    pass


with DAG(dag_id="my_id", schedule_interval=None, start_date=datetime.max) as dag:
    first_task = PythonOperator(task_id="1", python_callable=cloud_runner)
    second_task = PythonOperator(task_id="2", python_callable=cloud_runner)
    second_task_bis = PythonOperator(task_id="2bis", python_callable=cloud_runner)
    third_task = PythonOperator(task_id="3", python_callable=cloud_runner)

    first_task >> [second_task, second_task_bis] >> third_task
 

Выполнение следующей команды выполняет эту работу:

 airflow dags backfill my_id --start-date 2020-01-02
 

проблема:

Мое использование никогда не будет связано с каким-либо планированием / датой начала / датой окончания любого рода. Более того, мой DAG будет выполняться с сервера python Flask.

вопрос:

Есть ли способ достичь того же результата без воздушного потока? Или использовать воздушный поток в режиме только для запуска (без всей части планирования, airflow.db и т. Д.) В автономном скрипте python?

Спасибо

Ответ №1:

Воздушный поток-это одновременно библиотека и приложение. DAG не должны выполняться по расписанию. Вы можете запускать их по требованию с помощью API/CLI. Вы не можете запустить DAG (по расписанию или вручную), если приложение Airflow не запущено. Для работы Airflow требуется планировщик и база метаданных.

Чтобы ответить на ваш вопрос — Нет. Вы должны настроить и запустить воздушный поток, чтобы запустить DAG.