#python-3.x #google-cloud-platform #airflow #google-cloud-composer
#python-3.x #google-облачная платформа #воздушный поток #google-cloud-composer
Вопрос:
Я использую среду Cloud Composer для запуска рабочих процессов в проекте GCP. Один из моих рабочих процессов создает кластер обработки данных в другом проекте, используя DataprocClusterCreateOperator
, а затем пытается отправить задание PySpark в этот кластер, используя DataProcPySparkOperator
из airflow.contrib.operators.dataproc_operator
модуля.
Чтобы создать кластер, я могу указать project_id
параметр для его создания в другом проекте, но, похоже, DataProcPySparkOperator
этот параметр игнорируется. Например, я ожидаю, что смогу передать project_id
, но в итоге получаю 404
ошибку при выполнении задачи:
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
t1 = DataProcPySparkOperator(
project_id='my-gcp-project',
main='...',
arguments=[...],
)
Как я могу использовать DataProcPySparkOperator
для отправки задания в другой проект?
Ответ №1:
Модуль DataProcPySparkOperator
из airflow.contrib.operators.dataproc_operator
не принимает project_id
kwarg в своем конструкторе, поэтому он всегда будет по умолчанию отправлять задания Dataproc в проект, в котором находится среда Cloud Composer. Если передается аргумент, то он игнорируется, что приводит к ошибке 404 при запуске задачи, потому что оператор попытается выполнить опрос для задания, используя неправильный путь к кластеру.
Одним из обходных путей является копирование оператора и перехвата и изменение его для принятия идентификатора проекта. Однако более простым решением является использование более новых операторов из airflow.providers
пакетов, если вы используете версию Airflow, которая их поддерживает, потому что многие airflow.contrib
операторы устарели в более новых выпусках Airflow.
Ниже приведен пример. Обратите внимание, что в этом модуле есть более новый DataprocSubmitPySparkJobOperator
, но он устарел в пользу DataprocSubmitJobOperator
. Итак, вам следует использовать последний, который принимает идентификатор проекта.
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',
location='us-central1',
job={...},
)
Если вы используете среду с Composer 1.10.5 , версией Airflow 1.10.6 и Python 3, поставщики предварительно установлены и могут быть использованы немедленно.
Комментарии:
1. Если мы хотим запустить простой скрипт на Python [в кластере Google dataproc], как будет определен аргумент «job» в DataprocSubmitJobOperator? Возможно ли это вообще?