#apache-spark #pyspark #google-bigquery #databricks #databricks-connect
# #apache-spark #pyspark #google-bigquery #databricks #databricks-connect
Вопрос:
Я пытаюсь подключить BigQuery Dataset к Databrick и запустить скрипт с помощью Pyspark.
Процедуры, которые я выполнил:
- Я исправил BigQuery Json API для databrick в dbfs для доступа к соединению.
- Затем я добавил spark-bigquery-latest.jar в библиотеке кластера, и я запустил свой скрипт.
Когда я запускаю этот скрипт, я не столкнулся с какой-либо ошибкой.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar')
.getOrCreate()
)
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json")
.option("parentProject", "google-project-ID")
.option("project", "Dataset-Name")
.option("table","dataset.schema.tablename")
.load()
df.show()
Но вместо вызова одной таблицы в этой схеме я попытался вызвать все таблицы под ней, используя запрос типа:
from pyspark.sql import SparkSession
from google.cloud import bigquery
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar')
.getOrCreate()
)
client = bigquery.Client()
table_list = 'dataset.schema'
tables = client.list_tables(table_list)
for table in tables:
tlist = tlist.append(table)
for i in tlist:
sql_query = """select * from `dataset.schema.' i '`"""
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json")
.option("parentProject", "google-project-ID")
.option("project", "Dataset-Name")
.option("query", sql_query).load()
df.show()
или
Этот скрипт:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar')
.getOrCreate()
)
sql_query = """select * from `dataset.schema.tablename`"""
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json")
.option("parentProject", "google-project-ID")
.option("project", "Dataset-Name")
.option("query", sql_query).load()
df.show()
Я получаю эту необычную ошибку:
IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<command-131090852> in <module>
35 .option("parentProject", "google-project-ID")
36 .option("project", "Dataset-Name")
---> 37 .option("query", sql_query).load()
38 #df.show()
39
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
131 # Hide where the exception came from that shows a non-Pythonic
132 # JVM exception message.
--> 133 raise_from(converted)
134 else:
135 raise
/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)
IllegalArgumentException: A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.
Он распознает мой идентификатор проекта, когда я вызываю его как таблицу, но когда я запускаю его как запрос, я получаю эту ошибку.
Я попытался разобраться в этом и просмотрел множество сайтов в поисках ответа, но не смог получить четкий ответ на него.
Помощь очень ценится… Заранее спасибо…
Ответ №1:
Можете ли вы избежать использования запросов и просто использовать опцию table?
from pyspark.sql import SparkSession
from google.cloud import bigquery
spark = (
SparkSession.builder
.appName('bq')
.master('local[4]')
.config('parentProject', 'google-project-ID')
.config('spark.jars', 'dbfs:/FileStore/jars/jarlocation.jar')
.getOrCreate()
)
client = bigquery.Client()
table_list = 'dataset.schema'
tables = client.list_tables(table_list)
for table in tables:
tlist = tlist.append(table)
for i in tlist:
df = spark.read.format("bigquery").option("credentialsFile", "/dbfs/FileStore/tables/bigqueryapi.json")
.option("parentProject", "google-project-ID")
.option("project", "Dataset-Name")
.option("table","dataset.schema." str(i))
.load()
df.show()
Комментарии:
1. Что, если мне нужно отменить несколько вложенных столбцов в таблице, которую я получаю от BigQuery через dataframe, используя параметр запроса при чтении dataframe !?
2. @NaveenB пожалуйста, задайте другой вопрос. это слишком далеко от текущего вопроса, и здесь недостаточно места для ответа на него.
Ответ №2:
В моем случае у меня было такое же исключение, но потому, что я не указывал значение конфигурации parentProject
, которое является идентификатором проекта BigQuery, к которому я подключаюсь