Используйте operator для неудачных задач Airflow

#amazon-web-services #airflow

#amazon-веб-сервисы #воздушный поток

Вопрос:

У меня есть база данных со многими задачами, которые выполняются параллельно.

введите описание изображения здесь

  • Создается кластер EMR
  • Представлено несколько шагов EMR,
  • Для каждого EMRStepSensor настроено EmrAddStepsOperator ожидание результата шага.

Я хотел бы отправить, чтобы отправить сообщение SNS, если какой-либо из шагов не удался. Я видел несколько подходов, например, использование другого оператора ( SnsPublishOperator ) со свойством, вызываемым trigger_rule="all_done" . Я пробовал что-то вроде этого:

 def get_sns_operator(self, emr_env):
    return SnsPublishOperator(
        target_arn=emr_env['snstopic'],
        message="Foo",
        subject="Report of execution",
        task_id="sns_notification",
        trigger_rule="all_done",
        dag=self.dag
    )


def define_workflow(self):
    common_args = CommonArgs(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"), other_args=self.other_args)
    cluster_creator = EmrCreateJobFlowOperator(
        dag=self.dag,
        task_id='create_cluster',
        job_flow_overrides=common_args.jobflow_overrides_args(),
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
        region_name='eu-west-1'
    )
    tables = Variable.get("consolidation", deserialize_json=True).get(self.other_args['system']).get('tables')
    sns_operator = self.get_sns_operator(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"))
    for table in tables:
        step_args = copy.copy(self.other_args)
        step_args['table'] = table
        step_adder = EmrAddStepsOperator(
            dag=self.dag,
            task_id='step_{table}'.format(table=table),
            job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
            aws_conn_id='aws_default',
            steps=[common_args.step_args(**step_args)]
        )

        step_checker = EmrStepSensor(
            dag=self.dag,
            task_id='watch_step_{table}'.format(table=table),
            job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
            step_id="{{ task_instance.xcom_pull(task_ids='step_"   table   "', key='return_value')[0] }}",
            aws_conn_id='aws_default',
        )
        cluster_creator.set_downstream(step_adder)
        step_adder.set_downstream(step_checker)
        step_checker.set_downstream(sns_operator)
    return self.dag
  

И, по сути, задача создана успешно. Но мне интересно, как передать информационный статус или получить его из watch_steps... после того, как все они будут завершены, и отправить сообщение на всякий случай, если есть ошибка.

введите описание изображения здесь

Любая помощь будет оценена. Спасибо.

Ответ №1:

Похоже, вам действительно нужно one_failed значение для вашего trigger_rule — таким образом, вы получите уведомление только в случае сбоя какой-либо из вышестоящих задач, и вам не нужно анализировать какой-либо статус: получено уведомление, что-то нужно исправить.

Вы можете прочитать больше о различных правилах запуска здесь:https://airflow.apache.org/docs/stable/concepts.html#trigger-rules

Комментарии:

1. Но я хотел бы получить отчет после завершения всех задач.

2. Затем создайте вторую задачу, которая выполняет то же самое, но со значением по умолчанию trigger_rule , которое ожидает успешного завершения всех восходящих потоков.