Макрос воздушного потока считывается не так, как ожидалось

#python #airflow

Вопрос:

Мне нужно значение макроса воздушного потока, но возвращаемая строка читается не так, как ожидалось, все, что я получаю, — это сломанный DAG. Я протестировал часть сценария в терминале, чтобы проверить, не было ли что-то не так, но, похоже, это не так.

Я ожидаю, что строка типа «2016-06-28T16:51:45.978473-05:00» превратится в «2016-06-28T16:51»

Вот код. Эта часть находится перед областью действия декоратора DAG with DAG(..) as dag: .

 exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
 

Сообщение об ошибке:

 Broken DAG: [<path-to-dag>/processing_dag.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<path-to-dag>/processing_dag.py", line 16, in <module>
    exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
IndexError: list index out of range
 

Это означает, что я не попадаю '{{ execution_date }}' в формат, указанный в документах Airflow.

Запуск сценария DAG с сервера Airflow не активирует макросы, и DAG сломан, поэтому я не знаю, как отлаживать код. Есть ли способ распечатать значение '{{ execution_date }}' , чтобы я мог понять, что происходит?

[РЕДАКТИРОВАТЬ] Как и было запрошено, вот некоторые соответствующие части сценария. Импортированные модули являются:

 from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datamechanics_airflow_plugin.operator import DataMechanicsOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import pendulum
import re
 

В верхней части сценария:

 
local_tz = pendulum.timezone("America/Sao_Paulo")
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")


with DAG(
    dag_id="processing_dag",
    start_date=days_ago(0, second = 1).astimezone(tz=local_tz),
    schedule_interval="@daily",
) as dag:

  <tasks, etc>...



 

Комментарии:

1. поделитесь соответствующими частями вашей dag

2. Я отредактировал вопрос, чтобы показать более подробную информацию

3. где вы используете exec_date?

Ответ №1:

Шаблонные строки или макросы Jinja не оцениваются до выполнения задачи/DAG. Однако код верхнего уровня (он же логика, которая присутствует вне метода оператора execute() ) выполняется каждый интервал анализа файлов Планировщика.

Что здесь происходит, так это то, что эти две строки кода оценивают буквальную строку {{ execution_date }} , а не строку даты и времени, как вы ожидаете:

 exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
 

Ошибка, которая возникает, вызвана тем, что планировщик анализирует файл DAG и выполняет код верхнего уровня. Поскольку выражение регулярного выражения не возвращает результат, доступ к индексу завершается ошибкой, поскольку в литеральной строке отсутствует соответствующий шаблон строки регулярного {{ execution_date }} выражения .

В идеале выражение Jinja и связанная с ним логика должны быть частью a template_field , отмеченной в документации оператора, или в логике вызываемого оператора.