Как обрабатывать ошибки для воздушного потока (DatabricksSubmitRunOperator)

#airflow #azure-databricks

Вопрос:

У меня есть dag Airflow , который запускает серию записных книжек Databricks.

Теперь я хочу знать , не провалился ли кто-нибудь из записных книжек? Как отправить пользователю сообщение о том, что эта записная книжка вышла из строя, с небольшим количеством деталей, таких как дата выполнения.

и есть ли какой — либо способ обработки ошибок?

Комментарии:

1. Добавьте свой код DAG

Ответ №1:

В BaseOperator нем есть параметры, которые позволяют настроить отправку электронной почты в случае сбоя, таким образом, он доступен всем операторам:

 DatabricksSubmitRunOperator(...,email_on_failure=True,
                            email='your.email@where.com')
 

Ответ №2:

Шаг 1: Установите email_on_failure False и используйте операторы on_failure_callback . on_failure_callback функция, описанная ниже.

 from airflow.utils.email import send_email

def notify_email(contextDict, **kwargs):
    """Send custom email alerts."""

    # email title.
    title = "Airflow alert: {task_name} Failed".format(**contextDict)

    # email contents
    body = """
    Hi Everyone, <br>
    <br>
    There's been an error in the {task_name} job.<br>
    <br>
    Forever yours,<br>
    Airflow bot <br>
    """.format(**contextDict)

    send_email('you_email@address.com', title, body)
 

Шаг 2: Краткий пример dag воздушного потока ниже

 from airflow.models import DAG
from airflow.operators import PythonOperator
from airflow.utils.dates import days_ago

args = {
  'owner': 'me',
  'description': 'my_example',
  'start_date': days_ago(1)
}

# run every day at 12:05 UTC
dag = DAG(dag_id='example_dag', default_args=args, schedule_interval='0 5 * * *')

def print_hello():
  return 'hello!'

py_task = PythonOperator(task_id='example',
                         python_callable=print_hello,
                         on_failure_callback=notify_email,
                         dag=dag)

py_task
 

Обратите внимание, где установлено on_failure_callback значение, равное notify_email в PythonOperator .