#google-cloud-storage #airflow
#google-облачное хранилище #поток воздуха
Вопрос:
Я добавляю задачу в базу данных airflow следующим образом:
examples_task = KubernetesPodOperator(
task_id='examples_generation',
dag=dag,
namespace='test',
image='test_amazon_image',
name='pipe-labelled-examples-generation-tf-record-operator',
env={
'GOOGLE_APPLICATION_CREDENTIALS': Variable.get('google_cloud_credentials')
},
arguments=[
"--assets_path", Variable.get('assets_path'),
"--folder_source", Variable.get('folder_source'),
"--folder_destination", Variable.get('folder_destination'),
"--gcs_folder_destination", Variable.get('gcs_folder_destination'),
"--aws_region", Variable.get('aws_region'),
"--s3_endpoint", Variable.get('s3_endpoint')
],
get_logs=True)
Я думал, что смогу вставить файл service account json в качестве переменной и вызвать его, но это не работает, и документация airflow / Google неясна. Как вы это делаете?
Комментарии:
1. Вы пытались использовать соединение Airflow в пользовательском интерфейсе, а затем добавить имя соединения в операторе?
2. Я не пробовал этого, так как это был независимый от airflow интерфейс командной строки python, который я хочу использовать для независимого запуска kubernetes Pod operator. Я изучу это.
3. Кроме того, ваша конфигурация не может работать, потому
GOOGLE_APPLICATION_CREDENTIALS
что переменная должна ссылаться на файл, а не непосредственно на содержимое файла.4. Да, я так и подумал.
5. Есть идеи о наилучшем способе реализации этого?
Ответ №1:
Решения для переноса json в аргумент
examples_task = KubernetesPodOperator(
task_id='examples_generation',
dag=dag,
namespace='test',
image='test_amazon_image',
name='pipe-labelled-examples-generation-tf-record-operator',
arguments=[
"--folder_source", Variable.get('folder_source'),
"--folder_destination", Variable.get('folder_destination'),
"--gcs_folder_destination", Variable.get('gcs_folder_destination'),
"--aws_region", Variable.get('aws_region'),
"--s3_endpoint", Variable.get('s3_endpoint')
"--gcs_credentials", Variable.get('google_cloud_credentials')
],
get_logs=True)
затем в наборе cli
import json
from google.cloud import storage
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_info(json.loads(gcs_credentials))
client = storage.Client(project='project_id', credentials=credentials)