#python #airflow
#питон #воздушный поток
Вопрос:
Я взломал эту концепцию, но, похоже, я не могу заставить ее работать.
У меня есть несколько баз данных, извлекающих информацию из базы данных. Они запланированы ежедневно, и я использую макросы для заполнения правильного формата даты Oracle для моих диапазонов дат. Это хорошо работает, если я хочу повторно запустить определенный экземпляр, поскольку используется правильная дата, если мне нужно что-то повторно запустить.
START = "{{ macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
date_range = (f"{START}", f"{END}")
Кажется, я не могу понять, как использовать приглашение «Trigger JSON», когда я вручную запускаю DAG для переопределения этих значений.
Я знаю, что он передается как контекст, и я должен быть в состоянии получить значения, но я не нашел четкой документации.
В конечном счете, я ожидаю повторного запуска базы данных и передачи даты начала и окончания JSON, чтобы переопределить мои настройки deafault.
# for example if I want all of December 2020 in a file the JSON would be the following
{"START":"01-DEC-20", "END":"31-DEC-20"}
это пример настройки DAG, в котором я передаю значения в оператор Python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
START = "{{ macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
date_range = (f"{START}", f"{END}")
def _print_context(**context):
print(context)
with DAG(
"example_parametrized_dag",
schedule_interval=None,
start_date=days_ago(2),
tags=["example"],
) as dag:
print_context = PythonOperator(
task_id="print_context",
python_callable=_print_context,
provide_context=True,
op_kwargs={
"parameters": date_range,
},
)
В идеальном мире я мог бы проверить, есть ли у меня переопределенные значения, но я не знаю, возможно ли это?
Ответ №1:
Ну, кажется, что написание вопроса заставило меня понять, каков был ответ…
Учитывая следующий JSON,
{"startdate":"01-DEC-20","enddate":"31-DEC-20"}
Я могу изменить свои переменные, чтобы проверить наличие этих значений, и соответствующим образом настроить свои результаты.
START = "{{ dag_run.conf['startdate'] if dag_run else macros.ds_format(ds, '%Y-%m-%d', '%d-%b-%y') }}"
END = "{{ dag_run.conf['enddate'] if dag_run else macros.ds_format(next_ds, '%Y-%m-%d', '%d-%b-%y') }}"
Раздражающе просто. и результаты будут отражены, если вы проверите данные журнала (и распечатанный контекст).