#airflow #azure-databricks
Вопрос:
У меня есть dag Airflow , который запускает серию записных книжек Databricks.
Теперь я хочу знать , не провалился ли кто-нибудь из записных книжек? Как отправить пользователю сообщение о том, что эта записная книжка вышла из строя, с небольшим количеством деталей, таких как дата выполнения.
и есть ли какой — либо способ обработки ошибок?
Комментарии:
1. Добавьте свой код DAG
Ответ №1:
В BaseOperator
нем есть параметры, которые позволяют настроить отправку электронной почты в случае сбоя, таким образом, он доступен всем операторам:
DatabricksSubmitRunOperator(...,email_on_failure=True,
email='your.email@where.com')
Ответ №2:
Шаг 1: Установите email_on_failure
False
и используйте операторы on_failure_callback
. on_failure_callback
функция, описанная ниже.
from airflow.utils.email import send_email
def notify_email(contextDict, **kwargs):
"""Send custom email alerts."""
# email title.
title = "Airflow alert: {task_name} Failed".format(**contextDict)
# email contents
body = """
Hi Everyone, <br>
<br>
There's been an error in the {task_name} job.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(**contextDict)
send_email('you_email@address.com', title, body)
Шаг 2: Краткий пример dag воздушного потока ниже
from airflow.models import DAG
from airflow.operators import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'me',
'description': 'my_example',
'start_date': days_ago(1)
}
# run every day at 12:05 UTC
dag = DAG(dag_id='example_dag', default_args=args, schedule_interval='0 5 * * *')
def print_hello():
return 'hello!'
py_task = PythonOperator(task_id='example',
python_callable=print_hello,
on_failure_callback=notify_email,
dag=dag)
py_task
Обратите внимание, где установлено on_failure_callback
значение, равное notify_email
в PythonOperator
.