#python #airflow
#питон #воздушный поток
Вопрос:
Я работаю над интеграцией уведомлений о сбое при сбое задач для воздушного потока. Основываясь на других вопросах, заданных в SO по этой теме, у меня есть приведенный ниже код, выполняемый на моем узле воздушного потока. Он работает так, как задумывалось (одна задача не выполняется, другая выполняется успешно), за исключением того, что я не получаю слабое уведомление о невыполненной задаче. Я проверил свою способность публиковать слабые сообщения из Airflow с более общим кодом, который не ссылается на событие, которое должно запускаться. Любая помощь в том, почему это не работает, была бы признательна.
from airflow import DAG from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator from airflow.operators.python import PythonOperator from airflow.utils.timezone import datetime from airflow.hooks.base_hook import BaseHook def fail(): raise Exception("Task failed intentionally for testing purpose") def success(): print("success") SLACK_CONN_ID = 'slack_connection' def task_fail_slack_alert(context): slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID) slack_msg = """ :red_circle: Task Failed. *Task*: {task} *Dag*: {dag} *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( task=context.get('task_instance').task_id, dag=context.get('task_instance').dag_id, ti=context.get('task_instance'), exec_date=context.get('execution_date'), log_url=context.get('task_instance').log_url, ) failed_alert = SlackWebhookOperator( task_id='slack_test', http_conn_id='slack_connection', webhook_token=slack_webhook_token, message=slack_msg, username='airflow') return failed_alert.execute(context=context) default_args = { 'owner': 'airlfow', 'depends_on_past': False, 'start_date': datetime(2020, 6, 6), 'retries': 0, 'on_failure_callback':task_fail_slack_alert } with DAG( 'slacktest3', default_args=default_args, description='slacktest', catchup=False, ) as dag: task_1 = PythonOperator( task_id="slack_notification_test", python_callable=fail ) task_2 = PythonOperator( task_id="slack_notification_test2", python_callable=success )
Ответ №1:
обновлена функция, и теперь работает оповещение. Все еще не уверен, почему исходный код этого не делает. Обновленный код ниже для справки…
from airflow import DAG from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator from airflow.operators.python import PythonOperator from airflow.utils.timezone import datetime from airflow.hooks.base_hook import BaseHook def fail(): raise Exception("Task failed intentionally for testing purpose") def success(): print("success") def slack_notification(context): slack_msg = """ :red_circle: Task Failed. *Task*: {task} *Dag*: {dag} *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( task=context.get('task_instance').task_id, dag=context.get('task_instance').dag_id, ti=context.get('task_instance'), exec_date=context.get('execution_date'), log_url=context.get('task_instance').log_url, ) failed_alert = SlackWebhookOperator( task_id='slack_notification', http_conn_id='slack_connection', message=slack_msg) return failed_alert.execute(context=context) default_args = { 'owner': 'airlfow', 'depends_on_past': False, 'start_date': datetime(2020, 6, 6), 'retries': 0, 'on_failure_callback':slack_notification() } with DAG( 'slacktest3', default_args=default_args, description='slacktest', catchup=False, ) as dag: task_1 = PythonOperator( task_id="slack_notification_test", python_callable=fail ) task_2 = PythonOperator( task_id="slack_notification_test2", python_callable=success )