#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
Я работал над приложением aSspark и пытался преобразовать фрейм данных, как показано в таблице 1. Я хочу разделить каждый элемент столбца (_2) на сумму элементов (одного столбца), сгруппированных по элементам другого столбца (_1). Таблица 2 — ожидаемый результат.
таблица 1
--- ---
| _1| _2|
--- ---
| 0| 13|
| 0| 7|
| 0| 3|
| 0| 1|
| 0| 1|
| 1| 4|
| 1| 8|
| 1| 18|
| 1| 4|
--- ---
таблица 2
--- ----
| _1| _2 |
--- ----
| 0|13/x|
| 0| 7/x|
| 0| 3/x|
| 0| 1/x|
| 0| 1/x|
| 1| 4/y|
| 1| 8/y|
| 1|18/y|
| 1| 4/y|
--- ----
где, x= (13 7 3 1 1) и y = (4 8 18 4)
Затем я хочу вычислить энтропию для каждого элемента в столбце _1: т. е. Для каждого элемента в столбце _1 вычислить сумму (p_i x log (p_i)) в столбце _2. Где p_i — это в основном значения в столбце _2 для каждого значения в столбце _1 в таблице 2.
Конечный результат будет.
--- ---------
| _1| ENTROPY |
--- ---------
| 0|entropy_1|
| 1|entropy_2|
--- ---------
Как я могу реализовать это в spark (предпочтительно в scala)? Каков был бы оптимизированный способ выполнения вышеуказанных операций? Я новичок в scala, любые связанные предложения будут высоко оценены.
Спасибо.
Ответ №1:
Если вам нужно краткое решение, а группы достаточно малы, вы можете использовать оконные функции. Сначала вы должны определить окно:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
вероятность:
import org.apache.spark.sql.functions.sum
val p = $"_2" / sum($"_2").over(w)
val withP = df.withColumn("p", p)
и, наконец, энтропия:
import org.apache.spark.sql.functions.log2
withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Для примера данных
val df = Seq(
(0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
результат:
--- ------------------
| _1| entropy|
--- ------------------
| 1|1.7033848993102918|
| 0|1.7433726580786888|
--- ------------------
Если оконные функции неприемлемы с точки зрения производительности, вы можете попробовать aggregation-join-aggregation:
df.groupBy($"_1").agg(sum("_2").alias("total"))
.join(df, Seq("_1"), "inner")
.withColumn("p", $"_2" / $"total")
.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
где:
df.groupBy($"_1").agg(sum("_2").alias("total"))
вычисляет сумму _2
по _1
,
_.join(df, Seq("_1"), "inner")
добавляет агрегированный столбец к исходным данным,
_.withColumn("p", $"_2" / $"total")
вычисляет вероятности и:
_.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
агрегирует для получения энтропии.