Воздушный поток: как выбрать данные таблицы bigquery в dataframe

#python #pandas #airflow

#python #панды #воздушный поток

Вопрос:

Я новичок в воздушном потоке. Я создал свою первую базу данных ниже, выбрав данные из таблицы больших запросов Google и сохранив их в кадре данных pd. Требуется предложение ниже

  1. Где я должен указать идентификатор соединения моего большого запроса
  2. Поскольку pd.read_gbq требует аутентификации, как обрабатывать то же самое в базах данных airflow

импорт ОС

 import pandas as pd

from airflow.contrib.operators import bigquery_operatorfrom  #this will 
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook



def test_gbq():
    sql = "SELECT * FROM `bi-projects-300910.raw_data.production_time_records_cdc`"
    df = pd.read_gbq(sql, dialect='standard')
    print(len(df))


default_args = {'owner': 'Martin james',
                'depends_on_past': False,
                'start_date': datetime(2018, 2, 26, 11, 30),
                'email_on_retry': true,
                'retries': 2
                }

dag = DAG('test_gbq',
          default_args=default_args,
          schedule_interval="0 12 * * *")



testing = PythonOperator(
    task_id="test",
    python_callable=test_python,
    dag=dag)

testing
 

Ответ №1:

В общем, сначала вы должны создать учетную запись службы (и ключ для нее), которая имеет доступ к вашему проекту BigQuery и наборам данных https://cloud.google.com/iam/docs/creating-managing-service-accounts .

Затем вам нужно будет установить учетные данные этой учетной записи службы в pandas https://pandas-gbq.readthedocs.io/en/latest/howto/authentication.html#authenticating-with-a-service-account

Почему вы не используете оператор BigQuery в Airflow? https://pandas-gbq.readthedocs.io/en/latest/howto/authentication.html#authenticating-with-a-service-account ИМХО, это немного проще в настройке. После создания учетной записи службы вы можете настроить свое соединение в пользовательском интерфейсе воздушного потока и просто указать идентификатор соединения в операторе

Ответ №2:

В настоящее время MWAA несовместим с некоторыми инструментами Google, но я нашел этот пост, в котором объясняется, как интегрировать зависимости из s3, я интегрирую его, и он работает идеально

bigquery-and-amazon-managed-apache-airflow

Что касается соединений, вы можете добавить их в пользовательский интерфейс.

/управление-соединения