Уведомления о сбоях в работе воздушного потока

#python #airflow

#питон #воздушный поток

Вопрос:

Я работаю над интеграцией уведомлений о сбое при сбое задач для воздушного потока. Основываясь на других вопросах, заданных в SO по этой теме, у меня есть приведенный ниже код, выполняемый на моем узле воздушного потока. Он работает так, как задумывалось (одна задача не выполняется, другая выполняется успешно), за исключением того, что я не получаю слабое уведомление о невыполненной задаче. Я проверил свою способность публиковать слабые сообщения из Airflow с более общим кодом, который не ссылается на событие, которое должно запускаться. Любая помощь в том, почему это не работает, была бы признательна.

 from airflow import DAG from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator from airflow.operators.python import PythonOperator from airflow.utils.timezone import datetime from airflow.hooks.base_hook import BaseHook   def fail():  raise Exception("Task failed intentionally for testing purpose")  def success():  print("success")  SLACK_CONN_ID = 'slack_connection'  def task_fail_slack_alert(context):  slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID)  slack_msg = """  :red_circle: Task Failed.   *Task*: {task}   *Dag*: {dag}   *Execution Time*: {exec_date}   *Log Url*: {log_url}   """.format(  task=context.get('task_instance').task_id,  dag=context.get('task_instance').dag_id,  ti=context.get('task_instance'),  exec_date=context.get('execution_date'),  log_url=context.get('task_instance').log_url,  )  failed_alert = SlackWebhookOperator(  task_id='slack_test',  http_conn_id='slack_connection',  webhook_token=slack_webhook_token,  message=slack_msg,  username='airflow')  return failed_alert.execute(context=context)   default_args = {  'owner': 'airlfow',  'depends_on_past': False,  'start_date': datetime(2020, 6, 6),  'retries': 0,  'on_failure_callback':task_fail_slack_alert }  with DAG(  'slacktest3',  default_args=default_args,  description='slacktest',  catchup=False,   ) as dag:  task_1 = PythonOperator(  task_id="slack_notification_test",  python_callable=fail  )   task_2 = PythonOperator(  task_id="slack_notification_test2",  python_callable=success  )  

Ответ №1:

обновлена функция, и теперь работает оповещение. Все еще не уверен, почему исходный код этого не делает. Обновленный код ниже для справки…

 from airflow import DAG from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator from airflow.operators.python import PythonOperator from airflow.utils.timezone import datetime from airflow.hooks.base_hook import BaseHook    def fail():  raise Exception("Task failed intentionally for testing purpose")  def success():  print("success")  def slack_notification(context):  slack_msg = """  :red_circle: Task Failed.   *Task*: {task}   *Dag*: {dag}   *Execution Time*: {exec_date}   *Log Url*: {log_url}   """.format(  task=context.get('task_instance').task_id,  dag=context.get('task_instance').dag_id,  ti=context.get('task_instance'),  exec_date=context.get('execution_date'),  log_url=context.get('task_instance').log_url,  )  failed_alert = SlackWebhookOperator(  task_id='slack_notification',  http_conn_id='slack_connection',  message=slack_msg)  return failed_alert.execute(context=context)   default_args = {  'owner': 'airlfow',  'depends_on_past': False,  'start_date': datetime(2020, 6, 6),  'retries': 0,  'on_failure_callback':slack_notification() }  with DAG(  'slacktest3',  default_args=default_args,  description='slacktest',  catchup=False,   ) as dag:  task_1 = PythonOperator(  task_id="slack_notification_test",  python_callable=fail  )   task_2 = PythonOperator(  task_id="slack_notification_test2",  python_callable=success  )