PySpark

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я пытаюсь выполнить SQL-запрос для следующего PySpark DF:

  -------------------- 
|              values|
 -------------------- 
|[1.09125882, 0.97...|
|[1.0, 1.0, 1.0, 1...|
|[1.06119951, 1.04...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.12954037,...|
|[1.0, 1.08907695,...|
|[1.0, 1.0, 1.0, 1...|
|[1.017957352, 0.9...|
|[1.015306123, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.015306123, 1.0...|
|[1.07177033, 1.00...|
|[1.0, 1.09094099,...|
|[1.061907984, 1.0...|
|[1.072550215, 1.0...|
|[1.0, 1.0, 1.0, 1...|
|[1.0, 1.0, 1.0, 1...|
|[1.08173935, 1.04...|
|[1.039907582, 1.0...|
 -------------------- 
only showing top 20 rows

 

Я ссылаюсь на код из этого учебника по DataBricks (это последний пример). Запрос выглядит следующим образом:

 query = """SELECT values,
         AGGREGATE(values,
           (1.0 AS product, 0 AS N),
           (buffer, value) -> (value * buffer.product, buffer.N   1),
           buffer -> Power(buffer.product, 1.0 / buffer.N)) geomean FROM df_table"""
data_fin = spark.sql(query)
 

Для справки, вот схема для DF:

 root
 |-- values: array (nullable = true)
 |    |-- element: double (containsNull = true)
 

И я регистрирую таблицу с:

 data5.registerTempTable("df_table")
 

Однако я получаю ошибку типа:

 An error was encountered:
"cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N`   1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N`   1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N   1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]n - SubqueryAlias `df_table`n    - Project [zpid#26, zip_#455, values#514]n       - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]n          - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]n             - Filter (zip_#455 = 02138)n                - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]n                   - Repartition 40, falsen                      - Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquetn"
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve 'aggregate(df_table.`values`, named_struct('product', 1.0BD, 'N', 0), lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N`   1)), namedlambdavariable(), namedlambdavariable()), lambdafunction(POWER(CAST(namedlambdavariable().`product` AS DOUBLE), CAST((CAST(1.0BD AS DECIMAL(11,1)) / CAST(CAST(namedlambdavariable().`N` AS DECIMAL(10,0)) AS DECIMAL(11,1))) AS DOUBLE)), namedlambdavariable()))' due to data type mismatch: argument 3 requires struct<product:decimal(2,1),N:int> type, however, 'lambdafunction(named_struct('col1', (namedlambdavariable() * CAST(namedlambdavariable().`product` AS DOUBLE)), 'col2', (namedlambdavariable().`N`   1)), namedlambdavariable(), namedlambdavariable())' is of struct<col1:double,col2:int> type.; line 2 pos 9;nProject [values#514, aggregate(values#514, named_struct(product, 1.0, N, 0), lambdafunction(named_struct(col1, (lambda value#565 * cast(lambda buffer#564.product as double)), col2, (lambda buffer#564.N   1)), lambda buffer#564, lambda value#565, false), lambdafunction(POWER(cast(lambda buffer#566.product as double), cast(CheckOverflow((promote_precision(cast(1.0 as decimal(11,1))) / promote_precision(cast(cast(lambda buffer#566.N as decimal(10,0)) as decimal(11,1)))), DecimalType(13,12)) as double)), lambda buffer#566, false)) AS geomean#563]n - SubqueryAlias `df_table`n    - Project [zpid#26, zip_#455, values#514]n       - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 6 more fields]n          - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 5 more fields]n             - Filter (zip_#455 = 02138)n                - Project [_id#0, address#1, cmaToolCompCandidates#2, comps#3, data#4, description#5, hiResImageLink#6, homeType#7, hugePhotos#8, latitude#9, location#10, longitude#11, nearbyHomes#12, nearbySales#13, no#14, priceHist#15, priceHistory#16, propertyTaxRate#17, rentZestimate#18L, resoFacts#19, responsivePhotos#20, streetViewTileImageUrlMediumAddress#21, taxHistory#22, tourPhotos#23, ... 4 more fields]n                   - Repartition 40, falsen                      - Relation[_id#0,address#1,cmaToolCompCandidates#2,comps#3,data#4,description#5,hiResImageLink#6,homeType#7,hugePhotos#8,latitude#9,location#10,longitude#11,nearbyHomes#12,nearbySales#13,no#14,priceHist#15,priceHistory#16,propertyTaxRate#17,rentZestimate#18L,resoFacts#19,responsivePhotos#20,streetViewTileImageUrlMediumAddress#21,taxHistory#22,tourPhotos#23,... 3 more fields] parquetn"
 

Есть ли способ сохранить типы данных при подготовке SQL-запроса? Я бы предположил, что схема в исходном DF верна для SQL-запроса.

Комментарии:

1. является data_fin ли amp; data5 тем же или другим ?

2. Они другие. data_fin является ли DF результатом преобразования data5

3. можете ли вы показать обе схемы печати фреймов данных ?

Ответ №1:

Когда вам нужно запускать функции как AGGREGATE или REDUCE (оба являются псевдонимами), первым параметром является значение массива, а вторым параметром вы должны определить значения и типы по умолчанию. Вы можете писать 1.0 (Decimal, Double or Float) , 0 (Boolean, Byte, Short, Integer or Long) но это оставляет за вами ответственность за определение того, что может быть между этими вариантами. В вашем примере возникает ошибка, потому что неявно выбранные типы данных не совпадают.

Чтобы гарантировать, что тип данных правильный и ваш запрос всегда будет выполняться, измените (1.0 AS product, 0 AS N) его на (cast(1 as double) AS product, cast(0 as double) AS N)