Как извлечь значение XCOM из on_failure_callback

#airflow

#воздушный поток

Вопрос:

При сбое задачи возможно ли извлечь значение XCOM, которое было ранее установлено в другой задаче во время выполнения on_failure_callback?

Чтобы быть более конкретным, пример:

 dag: task1 >> task2
  
  • задача 1 успешно выполняется и устанавливается key="test" value=123 в Xcom
  • задача 2 завершается с ошибкой
  • вызывается on_failure_callback

Возможно ли получить значение ключа test в on_failure_callback ?

Я пробовал так, но, похоже, он не нашел никакого значения:

 # Daf configuration
    ...
    "on_failure_callback": deploy_failure,
    ...

# In task1
    kwargs["ti"].xcom_push(key="test", value=123)

# on_failure_callback method
def deploy_failure(context):
    print("/! Deploy failure callback triggered...")
    test_value = context.get("ti").xcom_pull(key="test")
    print(test_value)
  

test_value равно None

Я уверен, что значение Xcom установлено, потому что я вижу его на серверной части воздушного потока.

Есть идеи?

Ответ №1:

Я предполагаю, что есть какая-то проблема с provide_context в failure_callback. Вы можете обойти это, обратившись непосредственно к классу XCom:

 from airflow.models import XCom

def deploy_failure(context):
    print("/! Deploy failure callback triggered...")
    test_value = XCom.get_one(execution_date = context.get('execution_date'), key='test')
    print("ALERT: {0}".format(test_value))

  

Комментарии:

1. Сработало!! Но как протестировать (например, макет XCom)?