Воздушный поток — выберите данные таблицы bigquery в фрейм данных

# #google-bigquery #airflow #google-cloud-composer

#google-bigquery #воздушный поток #google-облако-композитор

Вопрос:

Я пытаюсь выполнить следующий DAG в композиторе воздушного потока в облаке Google, и я продолжаю получать ту же ошибку: conn_id hard_coded_project_name не определен

Может быть, кто-нибудь может указать мне правильное направление?

 from airflow.models import DAG  import os  from airflow.operators.dummy import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator  import datetime import pandas as pd  from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from airflow.contrib.operators.bigquery_operator import BigQueryOperator  from airflow.providers.google.cloud.operators import bigquery from airflow.contrib.hooks.bigquery_hook import BigQueryHook  default_args = {  'start_date': datetime.datetime(2020, 1, 1), }  PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")  def list_dates_in_df():  hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,  use_legacy_sql=False)  bq_client = bigquery.Client(project = hook._get_field("project"),  credentials = hook._get_credentials())  query = "select count(*) from LP_RAW.DIM_ACCOUNT;"  df = bq_client.query(query).to_dataframe()     with DAG(  'df_test',   schedule_interval=None,   catchup = False,   default_args=default_args  ) as dag:      list_dates = PythonOperator(  task_id ='list_dates',  python_callable = list_dates_in_df  )   list_dates  

Ответ №1:

Это означает, что PROJECT_ID , как видно из строки

 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")  

было присвоено значение hard_coded_project_name , так GCP_PROJECT_ID как не имеет значения.

Затем на линии

 hook = BigQueryHook(bigquery_conn_id=PROJECT_ID...  

строка hard_coded_project_name автоматически ассоциируется с идентификатором подключения в Airflow, и у нее нет значения или она не существует.


Чтобы избежать этой ошибки, вы можете выполнить любые действия, чтобы исправить это.

  1. Создайте идентификатор подключения для обоих GCP_PROJECT_ID , и hard_coded_project_name просто чтобы мы были уверены, что у обоих есть значения. Но если мы не хотим создавать соединение для GCP_PROJECT_ID , убедитесь, что hard_coded_project_name оно имеет значение, чтобы был запасной вариант. Вы можете сделать это с помощью
  • Открытие экземпляра воздушного потока.
  • Нажмите «Администратор» gt; «Подключения».
  • Нажмите «Создать».
  • Заполните «Идентификатор соединения», «Тип соединения» как «hard_coded_project_name» и «Облачная платформа Google» соответственно.
  • Заполните «Идентификатор проекта» вашим фактическим значением идентификатора проекта
  • Выполните эти действия в другой раз, чтобы создать GCP_PROJECT_ID
  • Соединение должно выглядеть следующим образом (как минимум, при условии, что идентификатор проекта будет работать. Но не стесняйтесь добавлять файл ключа или его содержимое и область действия, чтобы у вас не возникало проблем с аутентификацией в дальнейшем):

введите описание изображения здесь

  1. Вы можете использовать bigquery_default вместо hard_coded_project_name so по умолчанию, он будет указывать на проект, в котором запущен экземпляр Airflow. Ваш обновленный PROJECT_ID код назначения будет
     PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")  

Кроме того, при тестировании кода вы можете столкнуться с ошибкой в строке

 bq_client = bigquery.Client(project = hook._get_field("project")...  

так Client() как не существует from airflow.providers.google.cloud.operators import bigquery , вы должны использовать from google.cloud import bigquery вместо этого.

Вот фрагмент теста, в котором я только что создал hard_coded_project_name , поэтому PROJECT_ID буду использовать это соединение.Я подсчитал количество своих столов, и это сработало: введите описание изображения здесь

Вот фрагмент теста, который я сделал, когда использовал bigquery_default , где я получил количество своих таблиц, и это сработало: введите описание изображения здесь

Комментарии:

1. Отлично! Вариант 2 сработал! Спасибо!