Разделить элементы столбца на сумму элементов (одного столбца), сгруппированных по элементам другого столбца

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Я работал над приложением aSspark и пытался преобразовать фрейм данных, как показано в таблице 1. Я хочу разделить каждый элемент столбца (_2) на сумму элементов (одного столбца), сгруппированных по элементам другого столбца (_1). Таблица 2 — ожидаемый результат.

таблица 1

  --- --- 
| _1| _2|
 --- --- 
|  0| 13|
|  0|  7|
|  0|  3|
|  0|  1|
|  0|  1|
|  1|  4|
|  1|  8|
|  1| 18|
|  1|  4|
 --- --- 
  

таблица 2

  --- ---- 
| _1| _2 |
 --- ---- 
|  0|13/x|
|  0| 7/x|
|  0| 3/x|
|  0| 1/x|
|  0| 1/x|
|  1| 4/y|
|  1| 8/y|
|  1|18/y|
|  1| 4/y|
 --- ---- 
  

где, x= (13 7 3 1 1) и y = (4 8 18 4)

Затем я хочу вычислить энтропию для каждого элемента в столбце _1: т. е. Для каждого элемента в столбце _1 вычислить сумму (p_i x log (p_i)) в столбце _2. Где p_i — это в основном значения в столбце _2 для каждого значения в столбце _1 в таблице 2.

Конечный результат будет.

  --- --------- 
| _1| ENTROPY |
 --- --------- 
|  0|entropy_1|
|  1|entropy_2|
 --- --------- 
  

Как я могу реализовать это в spark (предпочтительно в scala)? Каков был бы оптимизированный способ выполнения вышеуказанных операций? Я новичок в scala, любые связанные предложения будут высоко оценены.

Спасибо.

Ответ №1:

Если вам нужно краткое решение, а группы достаточно малы, вы можете использовать оконные функции. Сначала вы должны определить окно:

 import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
  

вероятность:

 import org.apache.spark.sql.functions.sum

val p = $"_2" / sum($"_2").over(w)
val withP = df.withColumn("p", p)
  

и, наконец, энтропия:

 import org.apache.spark.sql.functions.log2

withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
  

Для примера данных

 val df = Seq(
  (0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
  

результат:

  --- ------------------ 
| _1|           entropy|
 --- ------------------ 
|  1|1.7033848993102918|
|  0|1.7433726580786888|
 --- ------------------ 
  

Если оконные функции неприемлемы с точки зрения производительности, вы можете попробовать aggregation-join-aggregation:

 df.groupBy($"_1").agg(sum("_2").alias("total"))
  .join(df, Seq("_1"), "inner")
  .withColumn("p", $"_2" / $"total")
  .groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
  

где:

 df.groupBy($"_1").agg(sum("_2").alias("total"))
  

вычисляет сумму _2 по _1 ,

 _.join(df, Seq("_1"), "inner")
  

добавляет агрегированный столбец к исходным данным,

 _.withColumn("p", $"_2" / $"total")
  

вычисляет вероятности и:

 _.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
  

агрегирует для получения энтропии.