#csv #apache-spark
#csv #apache-spark
Вопрос:
Мне нужно определить тестовый образец с ArrayType для Spark для чтения этих данных. Вот как выглядит схема данных:
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: integer (nullable = true)
| | |-- stat: float (nullable = true)
|-- naming: string (nullable = true)
Мое текущее определение поля данных показывает нулевые значения для всех строк, так как я могу структурно определить эти данные в файле CSV?
Вот как теперь выглядит структура моих файлов CSV:
"data1_id","data1_stat","data2_id","data2_stat","data3_id","data3_stat","naming"
"1","0.76","2","0.55","3","0.16","Default1"
"1","0.2","2","0.41","3","0.89","Default2"
"1","0.96","2","0.12","3","0.4","Default3"
"1","0.28","2","0.15","3","0.31","Default4"
"1","0.84","2","0.41","3","0.15","Default5"
Когда я вызываю show во входном фрейме данных, я получаю этот результат:
------- -----------
|data |naming |
------- -----------
|null |Default1 |
|null |Default2 |
|null |Default3 |
|null |Default4 |
|null |Default5 |
------- -----------
Ожидаемый результат:
---------------------------- -----------
|data |naming |
---------------------------- -----------
|[[1,0.76],[2,0.55],[3,0.16]]|Default1 |
|[[1,0.2],[2,0.41],[3,0.89]] |Default2 |
|[[1,0.96],[2,0.12],[3,0.4]] |Default3 |
|[[1,0.28],[2,0.15],[3,0.31]]|Default4 |
|[[1,0.84],[2,0.41],[3,0.15]]|Default5 |
---------------------------- -----------
Ответ №1:
Вам нужно преобразовать данные и создать выражения, такие как array(struct(<add your columns>))
scala> df.show(false)
-------- ---------- -------- ---------- -------- ---------- --------
|data1_id|data1_stat|data2_id|data2_stat|data3_id|data3_stat|naming |
-------- ---------- -------- ---------- -------- ---------- --------
|1 |0.76 |2 |0.55 |3 |0.16 |Default1|
|1 |0.2 |2 |0.41 |3 |0.89 |Default2|
|1 |0.96 |2 |0.12 |3 |0.4 |Default3|
|1 |0.28 |2 |0.15 |3 |0.31 |Default4|
|1 |0.84 |2 |0.41 |3 |0.15 |Default5|
-------- ---------- -------- ---------- -------- ---------- --------
Извлечение необходимых столбцов для массива
scala> val arrayColumns = df
.columns
.filter(_.contains("data"))
.map(_.split("_")(0))
.distinct
.map(c => struct(col(s"${c}_id").as("id"),col(s"${c}_stat").as("stat")))
scala> val colExpr = array(arrayColumns:_*).as("data")
Применение выражения colExpr к фрейму данных.
scala> val finalDf = df.select(colExpr,$"naming")
Схема
scala> finalDf.printSchema
root
|-- data: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- id: string (nullable = true)
| | |-- stat: string (nullable = true)
|-- naming: string (nullable = true)
Результат
scala> finalDf.show(false)
------------------------------ --------
|data |naming |
------------------------------ --------
|[[1,0.76], [2,0.55], [3,0.16]]|Default1|
|[[1,0.2], [2,0.41], [3,0.89]] |Default2|
|[[1,0.96], [2,0.12], [3,0.4]] |Default3|
|[[1,0.28], [2,0.15], [3,0.31]]|Default4|
|[[1,0.84], [2,0.41], [3,0.15]]|Default5|
------------------------------ --------