#python #google-cloud-platform #google-bigquery #airflow
#python #google-облачная платформа #google-bigquery #воздушный поток
Вопрос:
Я пытаюсь создать представление bigquery в airflow с помощью крючка bigquery и выдает следующую ошибку. Пожалуйста, найдите фрагмент кода ниже:
BigQueryHook().create_empty_table(
dataset_id="data_loader_view",
table_id="customerdata",
view={
"query": "SELECT * FROM data_loader.customerdata",
"useLegacySql": False
}
)
Ошибка обратной трассировки:
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 930, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
return_value = self.execute_callable(
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
return self.python_callable(*self.op_args, **self.op_kwargs
File "/home/airflow/gcs/dags/bigquery_view_dag.py", line 37, in create_bq_vie
view_util.create_bq_view_util(context['final_dict']
File "/home/airflow/gcs/dags/utils/view.py", line 29, in create_bq_view_uti
"useLegacySql": Fals
File "/usr/local/lib/airflow/airflow/providers/google/common/hooks/base_google.py", line 356, in inner_wrappe
return func(self, *args, **kwargs
File "/usr/local/lib/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 339, in create_empty_tabl
retry=retr
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 544, in create_tabl
retry, method="POST", path=path, data=data, timeout=timeou
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/bigquery/client.py", line 556, in _call_ap
return call(
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_fun
on_error=on_error
File "/opt/python3.6/lib/python3.6/site-packages/google/api_core/retry.py", line 184, in retry_targe
return target(
File "/opt/python3.6/lib/python3.6/site-packages/google/cloud/_http.py", line 423, in api_reques
raise exceptions.from_http_response(response
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/data_loader_view/tables: Invalid JSON payload received. Unknown name "query" at 'table.view': Proto field is not repeating, cannot start list
[2020-10-01 04:25:30,857] {base_task_runner.py:115} INFO - Job 53554: Subtask create_bq_view [2020-10-01 04:25:30,612] {taskinstance.py:1059} ERROR - 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/<project_name>/datasets/data_loader_view/tables: Invalid JSON payload received. Unknown name "query" at 'table.view': Proto field is not repeating, cannot start list.
Ссылочная документация:
https://airflow.apache.org/docs/stable/_api/airflow/contrib/hooks/bigquery_hook/index.html#airflow.contrib.hooks.bigquery_hook.BigQueryBaseCursor.create_empty_table
Пожалуйста, помогите мне решить эту проблему
Ответ №1:
Попробуйте использовать приведенный ниже код в качестве примера:
#!/usr/bin/python
# -*- coding: utf-8 -*-
from airflow.providers.google.cloud.hooks import bigquery
from airflow.operators import python_operator
from airflow import models
import datetime
#Creating function that calls the view creation
def create_view(ds, **kwargs):
bigquery.BigQueryHook().create_empty_table(dataset_id='<my-dataset>',
table_id='customerdata',
view={'query': 'SELECT * FROM `<my-dataset>.<my-source-table>`',
'useLegacySql': False})
default_dag_args = {
'start_date': datetime.datetime(2020, 10, 2, 11, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': '<my-project-id>',
}
with models.DAG('bigquery_airflow', schedule_interval='*/5 * * * *',
default_args=default_dag_args) as dag:
create_view = python_operator.PythonOperator(task_id='succeeded',
provide_context=True, python_callable=create_view)
create_view
Ответ №2:
Вы пытались добавить свое фактическое название проекта в свой код?
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/**<project_name>**/datasets/data_loader_view/tables
Кроме того, из предоставленной вами документации эта часть содержит идентификатор проекта, в отличие от вашего кода, который содержит только два компонента ( data_loader.customerdata
)
view = {
"query": "SELECT * FROM `**test-project-id**.test_dataset_id.test_table_prefix*` LIMIT 1000",
"useLegacySql": False
}
Комментарии:
1. Я добавил идентификатор проекта в запрос и в код. Я не работаю.