Определить ArrayType в CSV sample Spark

#csv #apache-spark

#csv #apache-spark

Вопрос:

Мне нужно определить тестовый образец с ArrayType для Spark для чтения этих данных. Вот как выглядит схема данных:

  |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- stat: float (nullable = true)
 |-- naming: string (nullable = true)
  

Мое текущее определение поля данных показывает нулевые значения для всех строк, так как я могу структурно определить эти данные в файле CSV?

Вот как теперь выглядит структура моих файлов CSV:

 "data1_id","data1_stat","data2_id","data2_stat","data3_id","data3_stat","naming"
"1","0.76","2","0.55","3","0.16","Default1"
"1","0.2","2","0.41","3","0.89","Default2"
"1","0.96","2","0.12","3","0.4","Default3"
"1","0.28","2","0.15","3","0.31","Default4"
"1","0.84","2","0.41","3","0.15","Default5"
  

Когда я вызываю show во входном фрейме данных, я получаю этот результат:

  ------- ----------- 
|data   |naming     |                                                                                  
 ------- ----------- 
|null   |Default1   |
|null   |Default2   |
|null   |Default3   |
|null   |Default4   |
|null   |Default5   |
 ------- ----------- 
  

Ожидаемый результат:

  ---------------------------- ----------- 
|data                        |naming     |                                                                                  
 ---------------------------- ----------- 
|[[1,0.76],[2,0.55],[3,0.16]]|Default1   |
|[[1,0.2],[2,0.41],[3,0.89]] |Default2   |
|[[1,0.96],[2,0.12],[3,0.4]] |Default3   |
|[[1,0.28],[2,0.15],[3,0.31]]|Default4   |
|[[1,0.84],[2,0.41],[3,0.15]]|Default5   |
 ---------------------------- ----------- 
  

Ответ №1:

Вам нужно преобразовать данные и создать выражения, такие как array(struct(<add your columns>))

 scala> df.show(false)
 -------- ---------- -------- ---------- -------- ---------- -------- 
|data1_id|data1_stat|data2_id|data2_stat|data3_id|data3_stat|naming  |
 -------- ---------- -------- ---------- -------- ---------- -------- 
|1       |0.76      |2       |0.55      |3       |0.16      |Default1|
|1       |0.2       |2       |0.41      |3       |0.89      |Default2|
|1       |0.96      |2       |0.12      |3       |0.4       |Default3|
|1       |0.28      |2       |0.15      |3       |0.31      |Default4|
|1       |0.84      |2       |0.41      |3       |0.15      |Default5|
 -------- ---------- -------- ---------- -------- ---------- -------- 
  

Извлечение необходимых столбцов для массива

 scala> val arrayColumns = df
            .columns
            .filter(_.contains("data"))
            .map(_.split("_")(0))
            .distinct
            .map(c => struct(col(s"${c}_id").as("id"),col(s"${c}_stat").as("stat")))

scala> val colExpr = array(arrayColumns:_*).as("data")
  

Применение выражения colExpr к фрейму данных.

 scala> val finalDf = df.select(colExpr,$"naming")
  

Схема

 scala> finalDf.printSchema
root
 |-- data: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- id: string (nullable = true)
 |    |    |-- stat: string (nullable = true)
 |-- naming: string (nullable = true)
  

Результат

 scala> finalDf.show(false)
 ------------------------------ -------- 
|data                          |naming  |
 ------------------------------ -------- 
|[[1,0.76], [2,0.55], [3,0.16]]|Default1|
|[[1,0.2], [2,0.41], [3,0.89]] |Default2|
|[[1,0.96], [2,0.12], [3,0.4]] |Default3|
|[[1,0.28], [2,0.15], [3,0.31]]|Default4|
|[[1,0.84], [2,0.41], [3,0.15]]|Default5|
 ------------------------------ --------