Доступ к GOOGLE_APPLICATION_CREDENTIALS в операторе kubernetes

#google-cloud-storage #airflow

#google-облачное хранилище #поток воздуха

Вопрос:

Я добавляю задачу в базу данных airflow следующим образом:

 examples_task = KubernetesPodOperator(
    task_id='examples_generation',
    dag=dag,
    namespace='test',
    image='test_amazon_image',
    name='pipe-labelled-examples-generation-tf-record-operator',
    env={
        'GOOGLE_APPLICATION_CREDENTIALS': Variable.get('google_cloud_credentials')
    },
    arguments=[
        "--assets_path", Variable.get('assets_path'),
        "--folder_source", Variable.get('folder_source'),
        "--folder_destination", Variable.get('folder_destination'),
        "--gcs_folder_destination", Variable.get('gcs_folder_destination'),
        "--aws_region", Variable.get('aws_region'),
        "--s3_endpoint", Variable.get('s3_endpoint')
    ],
    get_logs=True)
 

Я думал, что смогу вставить файл service account json в качестве переменной и вызвать его, но это не работает, и документация airflow / Google неясна. Как вы это делаете?

Комментарии:

1. Вы пытались использовать соединение Airflow в пользовательском интерфейсе, а затем добавить имя соединения в операторе?

2. Я не пробовал этого, так как это был независимый от airflow интерфейс командной строки python, который я хочу использовать для независимого запуска kubernetes Pod operator. Я изучу это.

3. Кроме того, ваша конфигурация не может работать, потому GOOGLE_APPLICATION_CREDENTIALS что переменная должна ссылаться на файл, а не непосредственно на содержимое файла.

4. Да, я так и подумал.

5. Есть идеи о наилучшем способе реализации этого?

Ответ №1:

Решения для переноса json в аргумент

 examples_task = KubernetesPodOperator(
    task_id='examples_generation',
    dag=dag,
    namespace='test',
    image='test_amazon_image',
    name='pipe-labelled-examples-generation-tf-record-operator',
    arguments=[
        "--folder_source", Variable.get('folder_source'),
        "--folder_destination", Variable.get('folder_destination'),
        "--gcs_folder_destination", Variable.get('gcs_folder_destination'),
        "--aws_region", Variable.get('aws_region'),
        "--s3_endpoint", Variable.get('s3_endpoint')
        "--gcs_credentials", Variable.get('google_cloud_credentials')

    ],
    get_logs=True)
 

затем в наборе cli

 import json
from google.cloud import storage
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_info(json.loads(gcs_credentials))
client = storage.Client(project='project_id', credentials=credentials)