#scala #apache-spark
#scala #apache-spark
Вопрос:
У меня есть RDD со следующей структурой:
(lang, id, name, max, min)
Я хочу добавить еще один столбец, total
, который содержит вычитание максимального значения столбца max
и минимального значения столбца min
для каждого уникального lang
(без уменьшения количества строк). Итак, я бы получил что-то вроде
rdd:
---- -- ---- --- ---
|lang|id|name|max|min|
---- -- ---- --- ---
| en| | | 5| 1|
| en| | | 2| 0|
| de| | | 9| 2|
| en| | | 7| 1|
| nl| | | 3| 0|
| nl| | | 5| 1|
---- -- ---- --- ---
Для
rdd:
---- -- ---- --- --- -----
|lang|id|name|max|min|total|
---- -- ---- --- --- -----
| en| | | 5| 1| 7|
| en| | | 2| 0| 7|
| de| | | 9| 2| 7|
| en| | | 7| 1| 7|
| nl| | | 3| 0| 5|
| nl| | | 5| 1| 5|
---- -- ---- --- --- -----
По соображениям совместимости я хочу добиться этого без использования DataFrames / Spark SQL.
Любое предложение высоко ценится!
Комментарии:
1. для lang = en max столбца max = 7 и min столбца min = 0 , поэтому общее количество должно быть 7 — 0 = 7, а не 6. Пожалуйста, просмотрите свой вывод и исправьте его
Ответ №1:
Вы можете агрегировать:
val rdd = sc.parallelize(Seq(
("en", "id1", "name1", 5, 1), ("en", "id2", "name2", 2, 0),
("de", "id3", "name3", 9, 2), ("en", "id4", "name4", 7, 1),
("nl", "id5", "name5", 3, 0), ("nl", "id6", "name6", 5, 1)
))
val totals = rdd.keyBy(_._1).aggregateByKey((Long.MinValue, Long.MaxValue))(
{ case ((maxA, minA), (_, _, _, maxX, minX)) =>
(Math.max(maxA, maxX), Math.min(minA, minX)) },
{ case ((maxA1, minA1), (maxA2, minA2)) =>
(Math.max(maxA1, maxA2), Math.min(minA1, minA2))}
).mapValues { case (max, min) => max - min }
объединение с исходными данными:
val vals = rdd.keyBy(_._1).join(totals).values
и сглаживание (с бесформенным):
import shapeless.syntax.std.tuple._
val result = vals.map { case (x, y) => x : y }
result.toDF.show
с выводом:
--- --- ----- --- --- ---
| _1| _2| _3| _4| _5| _6|
--- --- ----- --- --- ---
| en|id1|name1| 5| 1| 7|
| en|id2|name2| 2| 0| 7|
| en|id4|name4| 7| 1| 7|
| de|id3|name3| 9| 2| 7|
| nl|id5|name5| 3| 0| 5|
| nl|id6|name6| 5| 1| 5|
--- --- ----- --- --- ---
но для сложных агрегаций это становится утомительным, неэффективным и сложным в управлении довольно быстро.
Ответ №2:
Вы должны выполнить две операции с вашим RDD
1.Reducebykey
2. Присоединиться
val rdd = originalRDD.rdd.map(row =>
(row(0), (row(1).toString.toLong, row(2).toString.toLong))
)
Примените reduceByKey и получите минимальные и максимальные значения каждого lang
val filterRDD = jsonRdd.reduceByKey(minMax).map(row => (row._1, (row._2._1-row._2._2)))
def minMax(a: Tuple2[Long, Long], b: Tuple2[Long, Long]):Tuple2[Long,Long] = {
val min = if (a._1 < b._1) a._1 else b._1
val max = if (a._2 > b._2) a._2 else b._2
(min, max)
}
Применить условие соединения
rdd.join(filterRDD).map(row => (row._1, row._2._1._1, row._2._1._2, row._2._2))