#python #airflow
Вопрос:
Мне нужно значение макроса воздушного потока, но возвращаемая строка читается не так, как ожидалось, все, что я получаю, — это сломанный DAG. Я протестировал часть сценария в терминале, чтобы проверить, не было ли что-то не так, но, похоже, это не так.
Я ожидаю, что строка типа «2016-06-28T16:51:45.978473-05:00» превратится в «2016-06-28T16:51»
Вот код. Эта часть находится перед областью действия декоратора DAG with DAG(..) as dag:
.
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
Сообщение об ошибке:
Broken DAG: [<path-to-dag>/processing_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<path-to-dag>/processing_dag.py", line 16, in <module>
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
IndexError: list index out of range
Это означает, что я не попадаю '{{ execution_date }}'
в формат, указанный в документах Airflow.
Запуск сценария DAG с сервера Airflow не активирует макросы, и DAG сломан, поэтому я не знаю, как отлаживать код. Есть ли способ распечатать значение '{{ execution_date }}'
, чтобы я мог понять, что происходит?
[РЕДАКТИРОВАТЬ] Как и было запрошено, вот некоторые соответствующие части сценария. Импортированные модули являются:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datamechanics_airflow_plugin.operator import DataMechanicsOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import pendulum
import re
В верхней части сценария:
local_tz = pendulum.timezone("America/Sao_Paulo")
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
with DAG(
dag_id="processing_dag",
start_date=days_ago(0, second = 1).astimezone(tz=local_tz),
schedule_interval="@daily",
) as dag:
<tasks, etc>...
Комментарии:
1. поделитесь соответствующими частями вашей dag
2. Я отредактировал вопрос, чтобы показать более подробную информацию
3. где вы используете exec_date?
Ответ №1:
Шаблонные строки или макросы Jinja не оцениваются до выполнения задачи/DAG. Однако код верхнего уровня (он же логика, которая присутствует вне метода оператора execute()
) выполняется каждый интервал анализа файлов Планировщика.
Что здесь происходит, так это то, что эти две строки кода оценивают буквальную строку {{ execution_date }}
, а не строку даты и времени, как вы ожидаете:
exec_date = re.findall(r"^[d]{4}-[d]{2}-[d]{2}T[d]{2}:[d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
Ошибка, которая возникает, вызвана тем, что планировщик анализирует файл DAG и выполняет код верхнего уровня. Поскольку выражение регулярного выражения не возвращает результат, доступ к индексу завершается ошибкой, поскольку в литеральной строке отсутствует соответствующий шаблон строки регулярного {{ execution_date }}
выражения .
В идеале выражение Jinja и связанная с ним логика должны быть частью a template_field
, отмеченной в документации оператора, или в логике вызываемого оператора.