Оператор ЭМИ с поврежденным воздушным потоком Apache DAG выходит из строя

#amazon-web-services #amazon-emr #mwaa

Вопрос:

Я создаю DAG в среде управляемых рабочих процессов воздушного потока apache. DAG в основном использует оператор EMR , он создает кластер EMR, запускает задание spark, определенное в DAG, и удаляет кластер EMR. Я взял код с веб-сайта airflow. Это ссылка для code-https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.html

Сообщение об ошибке:[Создается кластер EMR, но ниже показано, связана ли ошибка с воздушным потоком]

 2021-08-25 05:00:04,520] {{logging_mixin.py:104}} INFO - [2021-08-25 05:00:04,520] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2021-08-25 05:00:04,600] {{process_utils.py:100}} INFO - Sending Signals.SIGTERM to GPID 1897
[2021-08-25 05:00:04,621] {{taskinstance.py:1265}} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-08-25 05:00:04,702] {{taskinstance.py:1482}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base.py", line 243, in execute
    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1267, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-08-25 05:00:04,725] {{taskinstance.py:1532}} INFO - Marking task as FAILED. dag_id=emr_job_flow_manual_steps_dag, task_id=watch_step, execution_date=20210824T030000, start_date=20210825T030008, end_date=20210825T050004
[2021-08-25 05:00:04,793] {{process_utils.py:66}} INFO - Process psutil.Process(pid=1897, status='terminated', exitcode=1, started='03:00:07') (1897) terminated with exit code 1
 

Мой класс среды воздушного потока:mw1.малый

Ответ №1:

Похоже, что у вашего DAG просто истекло время ожидания через 2 часа:

 start_date=20210825T030008, end_date=20210825T050004
 

К сожалению, интеграция MWAA с другими сервисами AWS недостаточно хорошо документирована, но я предполагаю, что роль выполнения среды MWAA не имеет разрешений на управление кластером EMR.

Проверьте роль, с которой работает MWAA, и назначьте необходимые разрешения, как описано здесь:

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html

Чтобы успешно выполнить 2 примера, вам необходимо создать роли службы IAM (EMR_EC2_DefaultRole и EMR_DefaultRole) для Amazon EMR.

Вы можете создать эти роли с помощью интерфейса командной строки AWS: aws emr создать-роли по умолчанию

Комментарии:

1. Привет, Яцек, Большое тебе спасибо за ответ. У меня уже есть обе роли.