#python-3.x #airflow
Вопрос:
Я вижу следующую ошибку при попытке запустить задачу t1 (также ранее мы использовали BigQueryOperator, но теперь пытаемся заставить работать BigQueryInsertJobOperator, и именно тогда это начало происходить:
Файл «», строка 1, в верхнем уровне кода шаблона файл «/usr/местные/воздушный поток/РЕПО/групп/article_traffic/SQL и/bigquery_extract.в SQL», строка 2, в верхнем уровне кода шаблона {% по вертикали в get_verticals(execution_date) %} файл «/usr/local/lib/python3.8/site-packages/jinja2/runtime.py», строка 545, в следующем р. = следующий(самовыдвижение._iterator) файл «/usr/local/airflow/repo/dags/article_traffic/utils.py», линия 28, в метод isactive вернуться ФДТ <= execdt <= ТДТ ошибку TypeError: не могу сравнить смещения-наивный и смещения известны даты и время
ниже приведен наш код с задачей t1:
def extract_from_bigquery (parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
f'{parent_dag_name}.{child_dag_name}',
schedule_interval=schedule_interval,
start_date=start_date,
user_defined_macros={
'get_verticals': util.get_activeverticals,
},
)
t1 = BigQueryInsertJobOperator(
dag=dag,
task_id='bq_query',
gcp_conn_id='google_cloud_default',
params={'data': util.querycontext},
configuration={
"query": {"query": "{% include 'sql/bigquery_extract.sql' %}"}
}
)
get_activeverticals-это метод ниже, который выдает ошибку:
def get_activeverticals(self, execdt):
def isactive(v):
fdt = datetime(*[int(dpart) for dpart in v.get('from', '1980-01-01').split('-')])
tdt = datetime(*[int(dpart) for dpart in v.get('to', '3000-01-01').split('-')])
return fdt <= execdt <= tdt
active_verticals = filter(isactive, self.querycontext['verticals'])
return active_verticals
Ответ №1:
Стандарт Python datetime()
по умолчанию создает наивные объекты datetime. Воздушный поток использует объекты datetime с поддержкой.
Я подозреваю, что datetime()
в вашем get_activeverticals()
импортируется из стандарта Python datetime
. Итак, в fdt <= execdt <= tdt
вы сравниваете наивные ( fdt
, tdt
) и осведомленные ( execdt
) объекты даты и времени. Это запрещено, поэтому вы получаете ошибку этого типа.
Быстрое решение проблемы состояло бы в том, чтобы datetime()
вместо этого использовать маятник, который создаст объект datetime с поддержкой:
from pendulum import datetime