#airflow
#воздушный поток
Вопрос:
При сбое задачи возможно ли извлечь значение XCOM, которое было ранее установлено в другой задаче во время выполнения on_failure_callback?
Чтобы быть более конкретным, пример:
dag: task1 >> task2
- задача 1 успешно выполняется и устанавливается
key="test"
value=123
в Xcom - задача 2 завершается с ошибкой
- вызывается on_failure_callback
Возможно ли получить значение ключа test
в on_failure_callback ?
Я пробовал так, но, похоже, он не нашел никакого значения:
# Daf configuration
...
"on_failure_callback": deploy_failure,
...
# In task1
kwargs["ti"].xcom_push(key="test", value=123)
# on_failure_callback method
def deploy_failure(context):
print("/! Deploy failure callback triggered...")
test_value = context.get("ti").xcom_pull(key="test")
print(test_value)
test_value равно None
Я уверен, что значение Xcom установлено, потому что я вижу его на серверной части воздушного потока.
Есть идеи?
Ответ №1:
Я предполагаю, что есть какая-то проблема с provide_context
в failure_callback. Вы можете обойти это, обратившись непосредственно к классу XCom:
from airflow.models import XCom
def deploy_failure(context):
print("/! Deploy failure callback triggered...")
test_value = XCom.get_one(execution_date = context.get('execution_date'), key='test')
print("ALERT: {0}".format(test_value))
Комментарии:
1. Сработало!! Но как протестировать (например, макет XCom)?