#airflow
#воздушный поток
Вопрос:
Как мне прочитать XCom предыдущего сообщения с помощью SimpleHttpsOperator, а затем решить выполнить задачу 2 в Airflow.
Предположим, у меня есть 3 задачи SimpleHttpsOperator, все задачи возвращают сообщение XCom, в значении XCom оно возвращает успех или неудачу в зависимости от результатов.
Итак, перед выполнением t2 я хочу проверить, был ли t1 успешным. Вся моя задача использует SimpleHttpsOperator
t1>> t2 >> t3
ниже приведен фрагмент моего кода:
t1 = SimpleHttpOperator(
task_id='t1',
http_conn_id='http_temp',
endpoint='update_data',
method='POST',
headers={"Content-Type":"application/json"},
xcom_push=True,
log_response=True,
dag=dag,
)
t2 = SimpleHttpOperator(
task_id='t2',
http_conn_id='http_temp',
endpoint='update_data',
method='POST',
headers={"Content-Type":"application/json"},
# response_check=lambda response: True if len(response.json()) == 0 else False,
xcom_push=True,
log_response=True,
dag=dag,
Ответ №1:
Вам придется использовать BranchPythonOperator. check_t1_status
amp; check_t2_status
в приведенной ниже цепочке зависимостей будет использоваться BranchPythonOperator
для проверки выходных данных предыдущей задачи с использованием xcom. И, основываясь на результатах, выполните следующую задачу или запустите фиктивный хвост, если предыдущая задача не удалась.
t1 >> check_t1_status >> t2 >> check_t2_status >> t3
check_t1_status >> t1_fail
check_t2_status >> t2_fail