Как вы можете переопределить значения при запуске базы данных Airflow, используя Configuration JSON

#python #airflow

#питон #воздушный поток

Вопрос:

Я взломал эту концепцию, но, похоже, я не могу заставить ее работать.

У меня есть несколько баз данных, извлекающих информацию из базы данных. Они запланированы ежедневно, и я использую макросы для заполнения правильного формата даты Oracle для моих диапазонов дат. Это хорошо работает, если я хочу повторно запустить определенный экземпляр, поскольку используется правильная дата, если мне нужно что-то повторно запустить.

 START = "{{ macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
date_range = (f"{START}", f"{END}")
 

Кажется, я не могу понять, как использовать приглашение «Trigger JSON», когда я вручную запускаю DAG для переопределения этих значений.

Я знаю, что он передается как контекст, и я должен быть в состоянии получить значения, но я не нашел четкой документации.

В конечном счете, я ожидаю повторного запуска базы данных и передачи даты начала и окончания JSON, чтобы переопределить мои настройки deafault.

 # for example if I want all of December 2020 in a file the JSON would be the following
{"START":"01-DEC-20", "END":"31-DEC-20"}
 

это пример настройки DAG, в котором я передаю значения в оператор Python

 from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

START = "{{ macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
date_range = (f"{START}", f"{END}")


def _print_context(**context):
    print(context)


with DAG(
    "example_parametrized_dag",
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["example"],
) as dag:

    print_context = PythonOperator(
        task_id="print_context",
        python_callable=_print_context,
        provide_context=True,
        op_kwargs={
            "parameters": date_range,
        },
    )
 

В идеальном мире я мог бы проверить, есть ли у меня переопределенные значения, но я не знаю, возможно ли это?

Ответ №1:

Ну, кажется, что написание вопроса заставило меня понять, каков был ответ…

Учитывая следующий JSON,

 {"startdate":"01-DEC-20","enddate":"31-DEC-20"}
 

Я могу изменить свои переменные, чтобы проверить наличие этих значений, и соответствующим образом настроить свои результаты.

 START = "{{ dag_run.conf['startdate'] if dag_run else macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ dag_run.conf['enddate'] if dag_run else macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
 

Раздражающе просто. и результаты будут отражены, если вы проверите данные журнала (и распечатанный контекст).