#python #airflow
Вопрос:
Я пытаюсь реализовать датчик внешних задач с помощью SmartSensors, но, поскольку он используется execution_date
для поиска другого статуса DAG, я, похоже, не смогу его передать, если я опущу его из своего датчика SmartExternalSensor, он говорит, что есть KeyError
с execution_date, так как он не существует.
Я попытался переопределить get_poke_context
метод
def get_poke_context(self, context):
result = super().get_poke_context(context)
if self.execution_date is None:
result['execution_date'] = context['execution_date']
return result
но теперь в нем говорится, что объект datetime не сериализуется в формате json (это делается при регистрации датчика в качестве SmartSensor с помощью json.dumps) и работает как обычный датчик. Если я передам напрямую строку этого объекта datetime, в ней будет указано, что str object has no isoformat() method
, таким образом, я знаю, что дата выполнения должна быть объектом datetime.
У вас, ребята, есть какие-нибудь идеи о том, как это обойти?
Комментарии:
1. как насчет преобразования вашей строки в объект DateTime перед использованием в качестве даты выполнения?
2. @KaBoom, я получаю ошибку
datetime object is not JSON serializable
Ответ №1:
Я получаю аналогичные проблемы, пытаясь использовать датчик внешних задач в качестве сенсора SmartSensor. Это ниже не было тщательно протестировано, но, похоже, работает.
import datetime
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.session import provide_session
class SmartExternalTaskSensor(ExternalTaskSensor):
# Something a bit odd happens with ExternalTaskSensor when run as a smart
# sensor. ExternalTaskSensor requires execution_date in the poke context,
# but the smart sensor system passes all poke context values to the
# constructor of ExternalTaskSensor, but it doesn't allow execution_date
# as an argument. So we add it...
def __init__(self, execution_date=None, **kwargs):
super().__init__(**kwargs)
def get_poke_context(self, context):
return {
'external_dag_id': self.external_dag_id,
'external_task_id': self.external_task_id,
'timeout': self.timeout,
'check_existence': self.check_existence,
# ... but execution_date has to be manually extracted from the
# context, and converted to a string, since it will be JSON
# encoded by the smart sensor system...
'execution_date': context['execution_date'].isoformat(),
}
@provide_session
def poke(self, context, session=None):
return super().poke(
{
**context,
# ... and then converted back to a datetime object since
# that's what ExternalTaskSensor poke needs
'execution_date': datetime.datetime.fromisoformat(
context['execution_date']
),
},
session,
)