#scala #apache-spark #apache-spark-sql #apache-spark-dataset
#scala #apache-spark #apache-spark-sql #apache-spark-dataset
Вопрос:
Я очень доволен наборами данных Spark 2.0 из-за безопасности типов во время компиляции. Но вот пара проблем, которые я не могу решить, я также не нашел хорошей документации для этого.
Проблема № 1 — операция разделения для агрегированного столбца — рассмотрим приведенный ниже код — У меня есть набор данных [MyCaseClass], и я хотел группировать клавиши на c1, c2, c3 и sum (c4) / 8. Приведенный ниже код работает хорошо, если я просто вычисляю сумму, но он выдает ошибку времени компиляции для divide (8). Интересно, как я могу добиться следующего.
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
Если я удалю операцию .divide(8) и выполню приведенную выше команду, это даст мне результат ниже.
----------- -------------
| key|sum(c4) |
----------- -------------
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
----------- -------------
Проблема № 2 — преобразование результата groupedByKey в другой типизированный фрейм данных —
Теперь вторая часть моей проблемы заключается в том, что я хочу снова вывести типизированный набор данных. Для этого у меня есть другой класс case (не уверен, нужен ли он), но я не уверен, как сопоставить с сгруппированным результатом —
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
но это снова завершается ошибкой с исключением, поскольку результат, сгруппированный по ключу, напрямую не сопоставляется с другим классом.
PS: любое другое решение для достижения вышеуказанного более чем приветствуется.
Ответ №1:
Первая проблема может быть решена путем использования типизированных столбцов до конца ( KeyValueGroupedDataset.agg
ожидается TypedColumn(-s)
), вы можете определить результат агрегирования как:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
и подключите его к следующему коду:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
чтобы получить
------- ---------------
| key|div(sum(c4), 8)|
------- ---------------
|[a,b,c]| 0.625|
------- ---------------
Вторая проблема является результатом использования класса, который не соответствует форме данных. Правильное представление может быть:
case class AnotherClass(key: (String, String, String), sum: Double)
который используется с данными, определенными выше:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
дало бы:
------- ---
| key|sum|
------- ---
|[a,b,c]|5.0|
------- ---
но .as[AnotherClass]
здесь не обязательно, если Dataset[((String, String, String), Double)]
это приемлемо.
Вы, конечно, можете пропустить все это и просто mapGroups
(хотя и не без снижения производительности):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group : iter.map(_.c4).sum)
с результатом
--- --- --- ---
| _1| _2| _3| _4|
--- --- --- ---
| a| b| c|5.0|
--- --- --- ---
reduceGroups
может быть лучшим вариантом:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 y.c4))
в результате Dataset
:
------- -----------
| _1| _2|
------- -----------
|[a,b,c]|[a,b,c,5.0]|
------- -----------
Комментарии:
1. Извините, этот код в «org.typelevel» %% «frameless-dataset-spark31» % «0.11.1» больше не будет компилироваться… как недавно изменился API, чтобы справиться с такой операцией?