Наборы данных Spark 2.0 Групповая клавиша и разделение операций и безопасность типов

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

Я очень доволен наборами данных Spark 2.0 из-за безопасности типов во время компиляции. Но вот пара проблем, которые я не могу решить, я также не нашел хорошей документации для этого.

Проблема № 1 — операция разделения для агрегированного столбца — рассмотрим приведенный ниже код — У меня есть набор данных [MyCaseClass], и я хотел группировать клавиши на c1, c2, c3 и sum (c4) / 8. Приведенный ниже код работает хорошо, если я просто вычисляю сумму, но он выдает ошибку времени компиляции для divide (8). Интересно, как я могу добиться следующего.

 final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()
  

Если я удалю операцию .divide(8) и выполню приведенную выше команду, это даст мне результат ниже.

  ----------- ------------- 
|        key|sum(c4)      |
 ----------- ------------- 
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
 ----------- ------------- 
  

Проблема № 2 — преобразование результата groupedByKey в другой типизированный фрейм данных —
Теперь вторая часть моей проблемы заключается в том, что я хочу снова вывести типизированный набор данных. Для этого у меня есть другой класс case (не уверен, нужен ли он), но я не уверен, как сопоставить с сгруппированным результатом —

 final case class AnotherClass(c1: String,
                          c2: String,
                          c3: String,
                          average: Double) 

 myCaseClass.
           groupByKey(myCaseClass =>
              (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
              agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
  

но это снова завершается ошибкой с исключением, поскольку результат, сгруппированный по ключу, напрямую не сопоставляется с другим классом.

PS: любое другое решение для достижения вышеуказанного более чем приветствуется.

Ответ №1:

Первая проблема может быть решена путем использования типизированных столбцов до конца ( KeyValueGroupedDataset.agg ожидается TypedColumn(-s) ), вы можете определить результат агрегирования как:

 val eight = lit(8.0)
  .as[Double]  // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
  .divide(eight)
  .as[Double]  // Required
  .name("div(sum(c4), 8)")
  

и подключите его к следующему коду:

 val myCaseClass = Seq(
  MyClass("a", "b", "c", 2.0),
  MyClass("a", "b", "c", 3.0)
).toDS

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .agg(sumByEight)
  

чтобы получить

  ------- --------------- 
|    key|div(sum(c4), 8)|
 ------- --------------- 
|[a,b,c]|          0.625|
 ------- --------------- 
  

Вторая проблема является результатом использования класса, который не соответствует форме данных. Правильное представление может быть:

 case class AnotherClass(key: (String, String, String), sum: Double)
  

который используется с данными, определенными выше:

  myCaseClass
   .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
   .agg(typedSum[MyClass](_.c4).name("sum"))
   .as[AnotherClass]
  

дало бы:

  ------- --- 
|    key|sum|
 ------- --- 
|[a,b,c]|5.0|
 ------- --- 
  

но .as[AnotherClass] здесь не обязательно, если Dataset[((String, String, String), Double)] это приемлемо.

Вы, конечно, можете пропустить все это и просто mapGroups (хотя и не без снижения производительности):

 import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
 .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
 .mapGroups((group, iter) => group :  iter.map(_.c4).sum)
  

с результатом

  --- --- --- ---    
| _1| _2| _3| _4|
 --- --- --- --- 
|  a|  b|  c|5.0|
 --- --- --- --- 
  

reduceGroups может быть лучшим вариантом:

 myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
  .reduceGroups((x, y) => x.copy(c4=x.c4   y.c4))
  

в результате Dataset :

  ------- -----------     
|     _1|         _2|
 ------- ----------- 
|[a,b,c]|[a,b,c,5.0]|
 ------- ----------- 
  

Комментарии:

1. Извините, этот код в «org.typelevel» %% «frameless-dataset-spark31» % «0.11.1» больше не будет компилироваться… как недавно изменился API, чтобы справиться с такой операцией?