Spark SQL: преобразование некоторых строк в столбцы

#apache-spark-sql

#apache-spark-sql

Вопрос:

Есть ли способ преобразовать org.apache.spark.sql.DataFrame подобное

 Predictor  icaoCode  num1  num2
P1         OTHH       1.1   1.2
P1         ZGGG       2.1   2.2
P2         OTHH       3.1   3.2
P2         ZGGG       4.1   4.2
P3         OTHH       5.1   5.2
P3         ZGGG       6.1   6.2
.           .          .     .
.           .          .     .
.           .          .     .
  

в DataFrame подобное?

 icaoCode  P1.num1  P1.num2  P2.num1  P2.num2  P3.num1  P3.num2  ...
OTHH          1.1      1.2      3.1      3.2      5.1      5.2  ...
ZGGG          2.1      2.2      4.1      4.2      6.1      6.2  ...
 .             .        .        .        .        .        .   ...    
 .             .        .        .        .        .        .   ...    
 .             .        .        .        .        .        .   ...    
  

Может быть произвольное количество значений для Predictor и для icaoCode .

Ответ №1:

В Spark 1.6.0 есть pivot функция для преобразования / транспонирования ваших данных. В вашем случае требуется некоторая предварительная обработка для подготовки данных pivot . Вот пример того, как я бы это сделал:

 def doPivot(): Unit = {
  val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)

  // dummy data
  val r1 = Input("P1", "OTHH", 1.1, 1.2)
  val r2 = Input("P1", "ZGGG", 2.1, 2.2)
  val r3 = Input("P2", "OTHH", 3.1, 3.2)

  val records = Seq(r1, r2, r3)
  val df = sqlContext.createDataFrame(records)

  // prepare data for pivot
  val fullName: ((String, String) => String) = (predictor: String, num: String) => {
    predictor   "."   num
  }
  val udfFullName = udf(fullName)
  val dfFullName = df.withColumn("num1-complete", udfFullName(col("predictor"), lit("num1")))
    .withColumn("num2-complete", udfFullName(col("predictor"), lit("num2")))

  val dfPrepared = dfFullName.select(col("icaoCode"), col("num1") as "num", col("num1-complete") as "value")
    .unionAll(dfFullName.select(col("icaoCode"), col("num2") as "num", col("num2-complete") as "value"))

  // transpose/pivot dataframe
  val dfPivoted = dfPrepared.groupBy(col("icaoCode")).pivot("value").mean("num")
  dfPivoted.show()
}

case class Input(predictor: String, icaoCode: String, num1: Double, num2: Double)
  

Окончательный фрейм данных должен работать на вас:

  -------- ------- ------- ------- ------- 
|icaoCode|P1.num1|P1.num2|P2.num1|P2.num2|
 -------- ------- ------- ------- ------- 
|    OTHH|    1.1|    1.2|    3.1|    3.2|
|    ZGGG|    2.1|    2.2|   null|   null|
 -------- ------- ------- ------- -------