#scala #apache-spark #apache-zeppelin #livy
#scala #apache-spark #apache-zeppelin #ливи
Вопрос:
У меня есть набор данных, в котором у меня есть порядковые номера, например, 0 и 1.
Category Value Sequences
1 10 0
1 11 1
1 13 1
1 16 1
1 20 0
1 21 0
1 22 1
1 25 1
1 27 1
1 29 1
1 30 0
1 32 1
1 34 1
1 35 1
1 38 0
Здесь 1 в столбце последовательности встречается трижды. Мне нужно суммировать только это значение последовательности.
Я пытаюсь сделать это, используя приведенный ниже код:
%livy2.spark
import org.apache.spark.rdd.RDD
val df = df.select( $"Category", $"Value", $"Sequences").rdd.groupBy(x =>
(x.getInt(0))
).map(
x => {
val Category= x(0).getInt(0)
val Value= x(0).getInt(1)
val Sequences = x(0).getInt(2)
for (i <- x.indices){
val vi = x(i).getFloat(4)
if (vi(0) >0 )
{
summing = Value//
}
(Category, summing)
}
}
)
df_new.take(10).foreach(println)
Когда я писал этот код, возникает ошибка, указывающая на это неполное утверждение.
Значение df представляет набор данных, который я дал изначально.
Ожидаемый результат:
Category summing
1 40
1 103
1 101
Я не знаю, где я отстаю. Было бы здорово, если бы кто-нибудь помог мне изучить эту новую вещь.
Ответ №1:
Это можно сделать, присвоив каждой строке уникальный идентификатор, а затем включив каждую единицу в группу, указанную следующим нулевым уникальным идентификатором:
val df = Seq(
(1, 10, 0),
(1, 11, 1),
(1, 13, 1),
(1, 16, 1),
(1, 20, 0),
(1, 21, 0),
(1, 22, 1),
(1, 25, 1),
(1, 27, 1),
(1, 29, 1),
(1, 30, 0),
(1, 32, 1),
(1, 34, 1),
(1, 35, 1),
(1, 38, 0)
).toDF("Category", "Value", "Sequences")
// assign each row unique id
val zipped = df.withColumn("zip", monotonically_increasing_id())
// Make range from zero to next zero
val categoryWindow = Window.partitionBy("Category").orderBy($"zip")
val groups = zipped
.filter($"Sequences" === 0)
.withColumn("rangeEnd", lead($"zip", 1).over(categoryWindow))
.withColumnRenamed("zip", "rangeStart")
println("Groups:")
groups.show(false)
// Assign range for each unit
val joinCondition = ($"units.zip" > $"groups.rangeStart").and($"units.zip" < $"groups.rangeEnd")
val unitsByRange = zipped
.filter($"Sequences" === 1).alias("units")
.join(groups.alias("groups"), joinCondition, "left")
.select("units.Category", "units.Value", "groups.rangeStart")
println("Units in groups:")
unitsByRange.show(false)
// Group by range
val result = unitsByRange
.groupBy($"Category", $"rangeStart")
.agg(sum("Value").alias("summing"))
.orderBy("rangeStart")
.drop("rangeStart")
println("Result:")
result.show(false)
Вывод:
Groups:
-------- ----- --------- ---------- ----------
|Category|Value|Sequences|rangeStart|rangeEnd |
-------- ----- --------- ---------- ----------
|1 |10 |0 |0 |4 |
|1 |20 |0 |4 |5 |
|1 |21 |0 |5 |8589934595|
|1 |30 |0 |8589934595|8589934599|
|1 |38 |0 |8589934599|null |
-------- ----- --------- ---------- ----------
Units in groups:
-------- ----- ----------
|Category|Value|rangeStart|
-------- ----- ----------
|1 |11 |0 |
|1 |13 |0 |
|1 |16 |0 |
|1 |22 |5 |
|1 |25 |5 |
|1 |27 |5 |
|1 |29 |5 |
|1 |32 |8589934595|
|1 |34 |8589934595|
|1 |35 |8589934595|
-------- ----- ----------
Result:
-------- -------
|Category|summing|
-------- -------
|1 |40 |
|1 |103 |
|1 |101 |
-------- -------