#python #apache-spark #pyspark #spark-dataframe
#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть такой фрейм данных:
----- --------------------
|index| merged|
----- --------------------
| 0|[[2.5, 2.4], [3.5...|
| 1|[[-1.0, -1.0], [-...|
| 2|[[-1.0, -1.0], [-...|
| 3|[[0.0, 0.0], [0.5...|
| 4|[[0.5, 0.5], [1.0...|
| 5|[[0.5, 0.5], [1.0...|
| 6|[[-1.0, -1.0], [0...|
| 7|[[0.0, 0.0], [0.5...|
| 8|[[0.5, 0.5], [1.0...|
----- --------------------
И я хочу разнести объединенный столбец в
----- ------- -------
|index|Column1|Column2|
----- ------- -------
| 0| 2.5| 2.4 |
| 1| 3.5| 0.5|
| 2| -1.0| -1.0|
| 3| -1.0| -1.0|
| 4| 0.0 | 0.0 |
| 5| 0.5| 0.74|
----- ------- -------
Каждый кортеж [[2.5, 2.4], [3.5,0,5]] восстановите два столбца, зная, что 2,5 и 3,5 будут сохранены в столбце 1, а (2,4,0,5) будут сохранены во втором столбце
Итак, я попробовал это
df= df.withColumn("merged", df["merged"].cast("array<array<float>>"))
df= df.withColumn("merged",explode('merged'))
затем я применю udf для создания другого DF
но я не могу привести данные или применить explode, и я получил сообщение об ошибке
pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true)
Я также пробовал
df= df.withColumn("merged", df["merged"].cast("array<string>"))
но ничего не работает
, и если я применяю explode без приведения, я получаю
pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType;
Комментарии:
1. можете ли вы дать схему df? похоже, что merged на самом деле является строкой, а не тем, что у вас есть в аргументе. Вы можете использовать
split
для разделения строки разделителем. Кроме того, похоже, что в вашем вопросе есть опечатки: разве индекс не одинаков для разнесенных значений в вашем примере ожидаемого результата? Или это то, что вы дали, что вы действительно хотите?2. Спасибо, я перечитал свой код и обнаружил, что забыл добавить возвращаемый тип ArrayType(ArrayType(FloatType())) в мою лямбда-функцию (которая объединяет мои столбцы)
3. итак … проблема решена?
4. Да, большое вам спасибо
Ответ №1:
Вы могли бы попробовать следующий код:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, StringType, IntegerType
from pyspark.sql.functions import udf, col
def col1_calc(merged):
return merged[0][0]
def col2_calc(merged):
return merged[0][1]
if __name__ == '__main__':
spark = SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.getOrCreate()
df = spark.createDataFrame([
(0, [[2.5,2.4],[3.5]]),
(1, [[-1.0,-1.0],[3.5]]),
(2, [[-1.0,-1.0],[3.5]]),
], ["index", "merged"])
df.show()
column1_calc = udf(col1_calc, FloatType())
df = df.withColumn('Column1', column1_calc(df['merged']))
column2_calc = udf(col2_calc, FloatType())
df = df.withColumn('Column2', column2_calc(df['merged']))
df = df.select(['Column1', 'Column2', 'index'])
df.show()
Вывод:
------- ------- -----
|Column1|Column2|index|
------- ------- -----
| 2.5| 2.4| 0|
| -1.0| -1.0| 1|
| -1.0| -1.0| 2|
------- ------- -----