#amazon-web-services #airflow
#amazon-веб-сервисы #воздушный поток
Вопрос:
У меня есть база данных со многими задачами, которые выполняются параллельно.
- Создается кластер EMR
- Представлено несколько шагов EMR,
- Для каждого
EMRStepSensor
настроеноEmrAddStepsOperator
ожидание результата шага.
Я хотел бы отправить, чтобы отправить сообщение SNS, если какой-либо из шагов не удался. Я видел несколько подходов, например, использование другого оператора ( SnsPublishOperator
) со свойством, вызываемым trigger_rule="all_done"
. Я пробовал что-то вроде этого:
def get_sns_operator(self, emr_env):
return SnsPublishOperator(
target_arn=emr_env['snstopic'],
message="Foo",
subject="Report of execution",
task_id="sns_notification",
trigger_rule="all_done",
dag=self.dag
)
def define_workflow(self):
common_args = CommonArgs(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"), other_args=self.other_args)
cluster_creator = EmrCreateJobFlowOperator(
dag=self.dag,
task_id='create_cluster',
job_flow_overrides=common_args.jobflow_overrides_args(),
aws_conn_id='aws_default',
emr_conn_id='emr_default',
region_name='eu-west-1'
)
tables = Variable.get("consolidation", deserialize_json=True).get(self.other_args['system']).get('tables')
sns_operator = self.get_sns_operator(emr_env=Variable.get("consolidation", deserialize_json=True).get("emr"))
for table in tables:
step_args = copy.copy(self.other_args)
step_args['table'] = table
step_adder = EmrAddStepsOperator(
dag=self.dag,
task_id='step_{table}'.format(table=table),
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
aws_conn_id='aws_default',
steps=[common_args.step_args(**step_args)]
)
step_checker = EmrStepSensor(
dag=self.dag,
task_id='watch_step_{table}'.format(table=table),
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='step_" table "', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(sns_operator)
return self.dag
И, по сути, задача создана успешно. Но мне интересно, как передать информационный статус или получить его из watch_steps...
после того, как все они будут завершены, и отправить сообщение на всякий случай, если есть ошибка.
Любая помощь будет оценена. Спасибо.
Ответ №1:
Похоже, вам действительно нужно one_failed
значение для вашего trigger_rule
— таким образом, вы получите уведомление только в случае сбоя какой-либо из вышестоящих задач, и вам не нужно анализировать какой-либо статус: получено уведомление, что-то нужно исправить.
Вы можете прочитать больше о различных правилах запуска здесь:https://airflow.apache.org/docs/stable/concepts.html#trigger-rules
Комментарии:
1. Но я хотел бы получить отчет после завершения всех задач.
2. Затем создайте вторую задачу, которая выполняет то же самое, но со значением по умолчанию
trigger_rule
, которое ожидает успешного завершения всех восходящих потоков.