разнесите массив массива- (фрейм данных) PySpark

#python #apache-spark #pyspark #spark-dataframe

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть такой фрейм данных:

    ----- -------------------- 
|index|              merged|
 ----- -------------------- 
|    0|[[2.5, 2.4], [3.5...|
|    1|[[-1.0, -1.0], [-...|
|    2|[[-1.0, -1.0], [-...|
|    3|[[0.0, 0.0], [0.5...|
|    4|[[0.5, 0.5], [1.0...|
|    5|[[0.5, 0.5], [1.0...|
|    6|[[-1.0, -1.0], [0...|
|    7|[[0.0, 0.0], [0.5...|
|    8|[[0.5, 0.5], [1.0...|
 ----- -------------------- 
  

И я хочу разнести объединенный столбец в

  ----- ------- ------- 
|index|Column1|Column2|
 ----- ------- ------- 
|    0|    2.5|   2.4 |
|    1|    3.5|    0.5|
|    2|   -1.0|   -1.0|
|    3|   -1.0|   -1.0|
|    4|   0.0 |   0.0 |
|    5|    0.5|   0.74|
 ----- ------- ------- 
  

Каждый кортеж [[2.5, 2.4], [3.5,0,5]] восстановите два столбца, зная, что 2,5 и 3,5 будут сохранены в столбце 1, а (2,4,0,5) будут сохранены во втором столбце

Итак, я попробовал это

 df= df.withColumn("merged", df["merged"].cast("array<array<float>>"))
df= df.withColumn("merged",explode('merged'))
  

затем я применю udf для создания другого DF

но я не могу привести данные или применить explode, и я получил сообщение об ошибке

 pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true)
  

Я также пробовал

 df= df.withColumn("merged", df["merged"].cast("array<string>"))
  

но ничего не работает
, и если я применяю explode без приведения, я получаю

 pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType;
  

Комментарии:

1. можете ли вы дать схему df? похоже, что merged на самом деле является строкой, а не тем, что у вас есть в аргументе. Вы можете использовать split для разделения строки разделителем. Кроме того, похоже, что в вашем вопросе есть опечатки: разве индекс не одинаков для разнесенных значений в вашем примере ожидаемого результата? Или это то, что вы дали, что вы действительно хотите?

2. Спасибо, я перечитал свой код и обнаружил, что забыл добавить возвращаемый тип ArrayType(ArrayType(FloatType())) в мою лямбда-функцию (которая объединяет мои столбцы)

3. итак … проблема решена?

4. Да, большое вам спасибо

Ответ №1:

Вы могли бы попробовать следующий код:

 from pyspark import SparkConf, SparkContext                        
from pyspark.sql import SparkSession                               

from pyspark.sql.types import FloatType, StringType, IntegerType   
from pyspark.sql.functions import udf, col                         


def col1_calc(merged):                                             
    return merged[0][0]                                            

def col2_calc(merged):                                             
    return merged[0][1]                                            

if __name__ == '__main__':                                         
    spark = SparkSession                                          
        .builder                                                  
        .appName("Python Spark SQL Hive integration example")     
        .getOrCreate()                                             

    df = spark.createDataFrame([                                   
        (0, [[2.5,2.4],[3.5]]),                                    
        (1, [[-1.0,-1.0],[3.5]]),                                  
        (2, [[-1.0,-1.0],[3.5]]),                                  
    ], ["index", "merged"])                                        

    df.show()                                                      

    column1_calc = udf(col1_calc, FloatType())                     
    df = df.withColumn('Column1', column1_calc(df['merged']))      
    column2_calc = udf(col2_calc, FloatType())                     
    df = df.withColumn('Column2', column2_calc(df['merged']))      

    df = df.select(['Column1', 'Column2', 'index'])                
    df.show()         
  

Вывод:

  ------- ------- ----- 
|Column1|Column2|index|
 ------- ------- ----- 
|    2.5|    2.4|    0|
|   -1.0|   -1.0|    1|
|   -1.0|   -1.0|    2|
 ------- ------- -----