Не удается настроить проект GCP при использовании DataProcPySparkOperator

#python-3.x #google-cloud-platform #airflow #google-cloud-composer

#python-3.x #google-облачная платформа #воздушный поток #google-cloud-composer

Вопрос:

Я использую среду Cloud Composer для запуска рабочих процессов в проекте GCP. Один из моих рабочих процессов создает кластер обработки данных в другом проекте, используя DataprocClusterCreateOperator , а затем пытается отправить задание PySpark в этот кластер, используя DataProcPySparkOperator из airflow.contrib.operators.dataproc_operator модуля.

Чтобы создать кластер, я могу указать project_id параметр для его создания в другом проекте, но, похоже, DataProcPySparkOperator этот параметр игнорируется. Например, я ожидаю, что смогу передать project_id , но в итоге получаю 404 ошибку при выполнении задачи:

 from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator

t1 = DataProcPySparkOperator(
  project_id='my-gcp-project',
  main='...',
  arguments=[...],
)
  

Как я могу использовать DataProcPySparkOperator для отправки задания в другой проект?

Ответ №1:

Модуль DataProcPySparkOperator из airflow.contrib.operators.dataproc_operator не принимает project_id kwarg в своем конструкторе, поэтому он всегда будет по умолчанию отправлять задания Dataproc в проект, в котором находится среда Cloud Composer. Если передается аргумент, то он игнорируется, что приводит к ошибке 404 при запуске задачи, потому что оператор попытается выполнить опрос для задания, используя неправильный путь к кластеру.

Одним из обходных путей является копирование оператора и перехвата и изменение его для принятия идентификатора проекта. Однако более простым решением является использование более новых операторов из airflow.providers пакетов, если вы используете версию Airflow, которая их поддерживает, потому что многие airflow.contrib операторы устарели в более новых выпусках Airflow.

Ниже приведен пример. Обратите внимание, что в этом модуле есть более новый DataprocSubmitPySparkJobOperator , но он устарел в пользу DataprocSubmitJobOperator . Итак, вам следует использовать последний, который принимает идентификатор проекта.

 from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator

t1 = DataprocSubmitJobOperator(
  project_id='my-gcp-project-id',
  location='us-central1',
  job={...},
)
  

Если вы используете среду с Composer 1.10.5 , версией Airflow 1.10.6 и Python 3, поставщики предварительно установлены и могут быть использованы немедленно.

Комментарии:

1. Если мы хотим запустить простой скрипт на Python [в кластере Google dataproc], как будет определен аргумент «job» в DataprocSubmitJobOperator? Возможно ли это вообще?