# #google-bigquery #airflow #google-cloud-composer
#google-bigquery #воздушный поток #google-облако-композитор
Вопрос:
Я пытаюсь выполнить следующий DAG в композиторе воздушного потока в облаке Google, и я продолжаю получать ту же ошибку: conn_id hard_coded_project_name
не определен
Может быть, кто-нибудь может указать мне правильное направление?
from airflow.models import DAG import os from airflow.operators.dummy import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator import datetime import pandas as pd from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.providers.google.cloud.operators import bigquery from airflow.contrib.hooks.bigquery_hook import BigQueryHook default_args = { 'start_date': datetime.datetime(2020, 1, 1), } PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name") def list_dates_in_df(): hook = BigQueryHook(bigquery_conn_id=PROJECT_ID, use_legacy_sql=False) bq_client = bigquery.Client(project = hook._get_field("project"), credentials = hook._get_credentials()) query = "select count(*) from LP_RAW.DIM_ACCOUNT;" df = bq_client.query(query).to_dataframe() with DAG( 'df_test', schedule_interval=None, catchup = False, default_args=default_args ) as dag: list_dates = PythonOperator( task_id ='list_dates', python_callable = list_dates_in_df ) list_dates
Ответ №1:
Это означает, что PROJECT_ID
, как видно из строки
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
было присвоено значение hard_coded_project_name
, так GCP_PROJECT_ID
как не имеет значения.
Затем на линии
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID...
строка hard_coded_project_name
автоматически ассоциируется с идентификатором подключения в Airflow, и у нее нет значения или она не существует.
Чтобы избежать этой ошибки, вы можете выполнить любые действия, чтобы исправить это.
- Создайте идентификатор подключения для обоих
GCP_PROJECT_ID
, иhard_coded_project_name
просто чтобы мы были уверены, что у обоих есть значения. Но если мы не хотим создавать соединение дляGCP_PROJECT_ID
, убедитесь, чтоhard_coded_project_name
оно имеет значение, чтобы был запасной вариант. Вы можете сделать это с помощью
- Открытие экземпляра воздушного потока.
- Нажмите «Администратор» gt; «Подключения».
- Нажмите «Создать».
- Заполните «Идентификатор соединения», «Тип соединения» как «hard_coded_project_name» и «Облачная платформа Google» соответственно.
- Заполните «Идентификатор проекта» вашим фактическим значением идентификатора проекта
- Выполните эти действия в другой раз, чтобы создать
GCP_PROJECT_ID
- Соединение должно выглядеть следующим образом (как минимум, при условии, что идентификатор проекта будет работать. Но не стесняйтесь добавлять файл ключа или его содержимое и область действия, чтобы у вас не возникало проблем с аутентификацией в дальнейшем):
- Вы можете использовать
bigquery_default
вместоhard_coded_project_name
so по умолчанию, он будет указывать на проект, в котором запущен экземпляр Airflow. Ваш обновленныйPROJECT_ID
код назначения будетPROJECT_ID = os.environ.get("GCP_PROJECT_ID", "bigquery_default")
Кроме того, при тестировании кода вы можете столкнуться с ошибкой в строке
bq_client = bigquery.Client(project = hook._get_field("project")...
так Client()
как не существует from airflow.providers.google.cloud.operators import bigquery
, вы должны использовать from google.cloud import bigquery
вместо этого.
Вот фрагмент теста, в котором я только что создал hard_coded_project_name
, поэтому PROJECT_ID
буду использовать это соединение.Я подсчитал количество своих столов, и это сработало:
Вот фрагмент теста, который я сделал, когда использовал bigquery_default
, где я получил количество своих таблиц, и это сработало:
Комментарии:
1. Отлично! Вариант 2 сработал! Спасибо!