Искровая сумма по ключу без уменьшения строк

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть RDD со следующей структурой:

 (lang, id, name, max, min)
  

Я хочу добавить еще один столбец, total , который содержит вычитание максимального значения столбца max и минимального значения столбца min для каждого уникального lang (без уменьшения количества строк). Итак, я бы получил что-то вроде

 rdd:
 ---- -- ---- --- --- 
|lang|id|name|max|min|
 ---- -- ---- --- --- 
|  en|  |    |  5|  1|
|  en|  |    |  2|  0|
|  de|  |    |  9|  2|
|  en|  |    |  7|  1|
|  nl|  |    |  3|  0|
|  nl|  |    |  5|  1|
 ---- -- ---- --- --- 
  

Для

 rdd:
 ---- -- ---- --- --- ----- 
|lang|id|name|max|min|total|
 ---- -- ---- --- --- ----- 
|  en|  |    |  5|  1|    7|
|  en|  |    |  2|  0|    7|
|  de|  |    |  9|  2|    7|
|  en|  |    |  7|  1|    7|
|  nl|  |    |  3|  0|    5|
|  nl|  |    |  5|  1|    5|
 ---- -- ---- --- --- ----- 
  

По соображениям совместимости я хочу добиться этого без использования DataFrames / Spark SQL.

Любое предложение высоко ценится!

Комментарии:

1. для lang = en max столбца max = 7 и min столбца min = 0 , поэтому общее количество должно быть 7 — 0 = 7, а не 6. Пожалуйста, просмотрите свой вывод и исправьте его

Ответ №1:

Вы можете агрегировать:

 val rdd = sc.parallelize(Seq(
  ("en", "id1", "name1", 5,  1), ("en", "id2", "name2", 2,  0), 
  ("de", "id3", "name3", 9,  2), ("en", "id4", "name4", 7,  1),
  ("nl", "id5", "name5", 3,  0), ("nl", "id6", "name6", 5,  1)
))

val totals = rdd.keyBy(_._1).aggregateByKey((Long.MinValue, Long.MaxValue))(
  { case ((maxA, minA), (_, _, _, maxX, minX)) => 
    (Math.max(maxA, maxX), Math.min(minA, minX)) }, 
  { case ((maxA1, minA1), (maxA2, minA2)) => 
    (Math.max(maxA1, maxA2), Math.min(minA1, minA2))}
).mapValues { case (max, min) => max - min }
  

объединение с исходными данными:

 val vals = rdd.keyBy(_._1).join(totals).values
  

и сглаживание (с бесформенным):

 import shapeless.syntax.std.tuple._

val result = vals.map { case (x, y) => x :  y }

result.toDF.show
  

с выводом:

  --- --- ----- --- --- ---  
| _1| _2|   _3| _4| _5| _6|
 --- --- ----- --- --- --- 
| en|id1|name1|  5|  1|  7|
| en|id2|name2|  2|  0|  7|
| en|id4|name4|  7|  1|  7|
| de|id3|name3|  9|  2|  7|
| nl|id5|name5|  3|  0|  5|
| nl|id6|name6|  5|  1|  5|
 --- --- ----- --- --- --- 
  

но для сложных агрегаций это становится утомительным, неэффективным и сложным в управлении довольно быстро.

Ответ №2:

Вы должны выполнить две операции с вашим RDD

1.Reducebykey

2. Присоединиться

  val rdd = originalRDD.rdd.map(row => 
 (row(0), (row(1).toString.toLong, row(2).toString.toLong))
 )
  

Примените reduceByKey и получите минимальные и максимальные значения каждого lang

 val filterRDD = jsonRdd.reduceByKey(minMax).map(row => (row._1, (row._2._1-row._2._2)))

  def minMax(a: Tuple2[Long, Long], b: Tuple2[Long, Long]):Tuple2[Long,Long] = {
  val min = if (a._1 < b._1) a._1 else b._1
  val max = if (a._2 > b._2) a._2 else b._2
  (min, max)
  }
  

Применить условие соединения

  rdd.join(filterRDD).map(row => (row._1, row._2._1._1, row._2._1._2, row._2._2))