#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
Я пытаюсь выполнить SQL-запрос для следующего PySpark DF:
--------------------
| values|
--------------------
|[1.09125882, 0.97...|
|[1.0, 1.0, 1.0, 1...|
|[1.06119951, 1.04...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.12954037,...|
|[1.0, 1.08907695,...|
|[1.0, 1.0, 1.0, 1...|
|[1.017957352, 0.9...|
|[1.015306123, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.015306123, 1.0...|
|[1.07177033, 1.00...|
|[1.0, 1.09094099,...|
|[1.061907984, 1.0...|
|[1.072550215, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.08173935, 1.04...|
|[1.039907582, 1.0...|
--------------------
only showing top 20 rows
Я ссылаюсь на код из этого учебника по DataBricks (это последний пример). Запрос выглядит следующим образом:
query = """SELECT values,
AGGREGATE(values,
(1.0 AS product, 0 AS N),
(buffer, value) -> (value * buffer.product, buffer.N 1),
buffer -> Power(buffer.product, 1.0 / buffer.N)) geomean FROM df_table"""
data_fin = spark.sql(query)
Для справки, вот схема для DF:
root
|-- values: array (nullable = true)
| |-- element: double (containsNull = true)
И я регистрирую таблицу с:
data5.registerTempTable("df_table")
Однако я получаю ошибку типа:
An error was encountered:
"cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` 1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` 1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N 1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]n - SubqueryAlias `df_table`n - Project [zpid#26, zip_#455, values#514]n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]n - Filter (zip_#455 = 02138)n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]n - Repartition 40, falsen - Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquetn"
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` 1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N` 1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N 1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]n - SubqueryAlias `df_table`n - Project [zpid#26, zip_#455, values#514]n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]n - Filter (zip_#455 = 02138)n - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]n - Repartition 40, falsen - Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquetn"
Есть ли способ сохранить типы данных при подготовке SQL-запроса? Я бы предположил, что схема в исходном DF верна для SQL-запроса.
Комментарии:
1. является
data_fin
ли amp;data5
тем же или другим ?2. Они другие.
data_fin
является ли DF результатом преобразованияdata5
3. можете ли вы показать обе схемы печати фреймов данных ?
Ответ №1:
Когда вам нужно запускать функции как AGGREGATE
или REDUCE
(оба являются псевдонимами), первым параметром является значение массива, а вторым параметром вы должны определить значения и типы по умолчанию. Вы можете писать 1.0 (Decimal, Double or Float)
, 0 (Boolean, Byte, Short, Integer or Long)
но это оставляет за вами ответственность за определение того, что может быть между этими вариантами. В вашем примере возникает ошибка, потому что неявно выбранные типы данных не совпадают.
Чтобы гарантировать, что тип данных правильный и ваш запрос всегда будет выполняться, измените (1.0 AS product, 0 AS N)
его на (cast(1 as double) AS product, cast(0 as double) AS N)