Как использовать датчик внешних задач воздушного потока в качестве интеллектуального датчика?

#python #airflow

Вопрос:

Я пытаюсь реализовать датчик внешних задач с помощью SmartSensors, но, поскольку он используется execution_date для поиска другого статуса DAG, я, похоже, не смогу его передать, если я опущу его из своего датчика SmartExternalSensor, он говорит, что есть KeyError с execution_date, так как он не существует.

Я попытался переопределить get_poke_context метод

     def get_poke_context(self, context):
        result = super().get_poke_context(context)

        if self.execution_date is None:
            result['execution_date'] = context['execution_date']

        return result
 

но теперь в нем говорится, что объект datetime не сериализуется в формате json (это делается при регистрации датчика в качестве SmartSensor с помощью json.dumps) и работает как обычный датчик. Если я передам напрямую строку этого объекта datetime, в ней будет указано, что str object has no isoformat() method , таким образом, я знаю, что дата выполнения должна быть объектом datetime.

У вас, ребята, есть какие-нибудь идеи о том, как это обойти?

Комментарии:

1. как насчет преобразования вашей строки в объект DateTime перед использованием в качестве даты выполнения?

2. @KaBoom, я получаю ошибку datetime object is not JSON serializable

Ответ №1:

Я получаю аналогичные проблемы, пытаясь использовать датчик внешних задач в качестве сенсора SmartSensor. Это ниже не было тщательно протестировано, но, похоже, работает.

 import datetime

from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.session import provide_session


class SmartExternalTaskSensor(ExternalTaskSensor):
    # Something a bit odd happens with ExternalTaskSensor when run as a smart
    # sensor. ExternalTaskSensor requires execution_date in the poke context,
    # but the smart sensor system passes all poke context values to the
    # constructor of ExternalTaskSensor, but it doesn't allow execution_date
    # as an argument. So we add it...
    def __init__(self, execution_date=None, **kwargs):
        super().__init__(**kwargs)

    def get_poke_context(self, context):
        return {
            'external_dag_id': self.external_dag_id,
            'external_task_id': self.external_task_id,
            'timeout': self.timeout,
            'check_existence': self.check_existence,
            # ... but execution_date has to be manually extracted from the
            # context, and converted to a string, since it will be JSON
            # encoded by the smart sensor system...
            'execution_date': context['execution_date'].isoformat(),
        }

    @provide_session
    def poke(self, context, session=None):
        return super().poke(
            {
                **context,
                # ... and then converted back to a datetime object since
                # that's what ExternalTaskSensor poke needs
                'execution_date': datetime.datetime.fromisoformat(
                    context['execution_date']
                ),
            },
            session,
        )