Scala Spark: преобразование типа столбцов структуры в десятичный тип

#scala #apache-spark #apache-spark-sql #aws-glue

#scala #apache-spark #apache-spark-sql #aws-glue

Вопрос:

У меня есть csv, хранящийся в местоположении s3, в котором есть такие данные

 column1 | column2 | 
-------- ----------
| adsf  | 2000.0  |   
| fff   | 232.34  | 
 

У меня есть задание склеивания AWS в Scala, которое считывает этот файл в фрейм данных

 var srcDF= glueContext.getCatalogSource(database = '',
                                        tableName = '',
                                        redshiftTmpDir = "",
                                        transformationContext = "").getDynamicFrame().toDF()
 

Когда я печатаю схему, она выводит себя следующим образом

 srcDF.printSchema()

|-- column1 : string | 
|-- column2 : struct (double, string) | 
 

И фрейм данных выглядит так

 column1 | column2    | 
-------- -------------
| adsf  | [2000.0,]  |   
| fff   | [232.34,]  | 
 

Когда я пытаюсь сохранить фрейм данных в csv, он жалуется, что

 org.apache.spark.sql.AnalysisException CSV data source does not support struct<double:double,string:string> data type.
 

Как преобразовать фрейм данных, чтобы только столбцы типа Struct (если они существуют) были десятичными? Выводить следующим образом

 column1 | column2 | 
-------- ----------
| adsf | 2000.0   |   
| fff  | 232.34   | 
 

Редактировать:

Спасибо за ответ. Я попытался использовать следующий код

 df.select($"column2._1".alias("column2")).show()
 

Но получил ту же ошибку для обоих

 org.apache.spark.sql.AnalysisException No such struct field _1 in double, string;
 

Редактировать 2:

Кажется, что искра, столбцы были сглажены и переименованы в «double, string»

Итак, это решение сработало для меня

 df.select($"column2.double").show()
 

Ответ №1:

Вы можете извлекать поля из структуры с помощью getItem . Код может быть примерно таким:

 import spark.implicits._
import org.apache.spark.sql.functions.{col, getItem}

val df = Seq(
  ("adsf", (2000.0,"")),
  ("fff", (232.34,""))
).toDF("A", "B")
df.show()
df.select(col("A"), col("B").getItem("_1").as("B")).show()
 

он будет печатать:

 before select:
 ---- ---------- 
|   A|         B|
 ---- ---------- 
|adsf|[2000.0, ]|
| fff|[232.34, ]|
 ---- ---------- 

after select:
 ---- ------ 
|   A|     B|
 ---- ------ 
|adsf|2000.0|
| fff|232.34|
 ---- ------ 
 

Ответ №2:

Вы также можете использовать точечную нотацию column2._1 , чтобы получить поле структуры по имени:

 val df = Seq(
  ("adsf", (2000.0,"")),
  ("fff", (232.34,""))
).toDF("column1", "column2")

df.show
 ------- ---------- 
|column1|   column2|
 ------- ---------- 
|   adsf|[2000.0, ]|
|    fff|[232.34, ]|
 ------- ---------- 

val df2 = df.select($"column1", $"column2._1".alias("column2"))

df2.show
 ------- ------- 
|column1|column2|
 ------- ------- 
|   adsf| 2000.0|
|    fff| 232.34|
 ------- ------- 

df2.coalesce(1).write.option("header", "true").csv("output")
 

и ваш csv-файл будет в output/ папке:

 column1,column2
adsf,2000.0
fff,232.34