использование воздушного потока — не удается сравнить время смещения-наивное и с учетом смещения

#python-3.x #airflow

Вопрос:

Я вижу следующую ошибку при попытке запустить задачу t1 (также ранее мы использовали BigQueryOperator, но теперь пытаемся заставить работать BigQueryInsertJobOperator, и именно тогда это начало происходить:

Файл «», строка 1, в верхнем уровне кода шаблона файл «/usr/местные/воздушный поток/РЕПО/групп/article_traffic/SQL и/bigquery_extract.в SQL», строка 2, в верхнем уровне кода шаблона {% по вертикали в get_verticals(execution_date) %} файл «/usr/local/lib/python3.8/site-packages/jinja2/runtime.py», строка 545, в следующем р. = следующий(самовыдвижение._iterator) файл «/usr/local/airflow/repo/dags/article_traffic/utils.py», линия 28, в метод isactive вернуться ФДТ <= execdt <= ТДТ ошибку TypeError: не могу сравнить смещения-наивный и смещения известны даты и время

ниже приведен наш код с задачей t1:

 def extract_from_bigquery (parent_dag_name, child_dag_name, start_date, schedule_interval):
    dag = DAG(
        f'{parent_dag_name}.{child_dag_name}',
        schedule_interval=schedule_interval,
        start_date=start_date,
        user_defined_macros={
            'get_verticals': util.get_activeverticals,
        },
    )


    t1 = BigQueryInsertJobOperator(
        dag=dag,
        task_id='bq_query',
        gcp_conn_id='google_cloud_default',
        params={'data': util.querycontext},
        configuration={
            "query": {"query": "{% include 'sql/bigquery_extract.sql' %}"}
             }
    )
 

get_activeverticals-это метод ниже, который выдает ошибку:

 def get_activeverticals(self, execdt):

    def isactive(v):
        fdt = datetime(*[int(dpart) for dpart in v.get('from', '1980-01-01').split('-')])
        tdt = datetime(*[int(dpart) for dpart in v.get('to', '3000-01-01').split('-')])

        return fdt <= execdt <= tdt

    active_verticals = filter(isactive, self.querycontext['verticals'])

    return active_verticals
 

Ответ №1:

Стандарт Python datetime() по умолчанию создает наивные объекты datetime. Воздушный поток использует объекты datetime с поддержкой.

Я подозреваю, что datetime() в вашем get_activeverticals() импортируется из стандарта Python datetime . Итак, в fdt <= execdt <= tdt вы сравниваете наивные ( fdt , tdt ) и осведомленные ( execdt ) объекты даты и времени. Это запрещено, поэтому вы получаете ошибку этого типа.

Быстрое решение проблемы состояло бы в том, чтобы datetime() вместо этого использовать маятник, который создаст объект datetime с поддержкой:

 from pendulum import datetime