#scala #apache-spark #dataframe #group-by #spark-dataframe
#scala #apache-spark #фрейм данных #группирование по #apache-spark-sql
Вопрос:
У меня есть фрейм данных, мне нужны две разные группы в одном фрейме данных.
---- ------- -------- ----------------------------
| id | type | item | value | timestamp |
---- ------- -------- ----------------------------
| 1 | rent | dvd | 12 |2016-09-19T00:00:00Z
| 1 | rent | dvd | 12 |2016-09-19T00:00:00Z
| 1 | buy | tv | 12 |2016-09-20T00:00:00Z
| 1 | rent | movie | 12 |2016-09-20T00:00:00Z
| 1 | buy | movie | 12 |2016-09-18T00:00:00Z
| 1 | buy | movie | 12 |2016-09-18T00:00:00Z
---- ------- ------- ------------------------------
Я хотел бы получить результат в виде :
id : 1
totalValue : 72 --- group by based on id
typeCount : {"rent" : 3, "buy" : 3} --- group by based on id
itemCount : {"dvd" : 2, "tv" : 1, "movie" : 3 } --- group by based on id
typeForDay : {"rent: 2, "buy" : 2 } --- group By based on id and dayofmonth(col("timestamp")) atmost 1 type per day
Я пытался :
val count_by_value = udf {( listValues :scala.collection.mutable.WrappedArray[String]) => if (listValues == null) null else listValues.groupBy(identity).mapValues(_.size)}
val group1 = df.groupBy("id").agg(collect_list("type"),sum("value") as "totalValue", collect_list("item"))
val group1Result = group1.withColumn("typeCount", count_by_value($"collect_list(type)"))
.drop("collect_list(type)")
.withColumn("itemCount", count_by_value($"collect_list(item)"))
.drop("collect_list(item)")
val group2 = df.groupBy("id", dayofmonth(col("timestamp"))).agg(collect_set("type"))
val group2Result = group2.withColumn("typeForDay", count_by_value($"collect_set(type)"))
.drop("collect_set(type)")
val groupedResult = group1Result.join(group2Result, "id").show()
Но это требует времени, есть ли другой эффективный способ сделать это?
Ответ №1:
Лучший подход — добавить каждое групповое поле в ключ и уменьшить их вместо groupBy() . Вы можете использовать эти:
df1.map(rec => (rec(0), rec(3).toString().toInt)).
reduceByKey(_ _).take(5).foreach(println)
=> (1,72)
df1.map(rec => ((rec(0), rec(1)), 1)).
map(x => (x._1._1, x._1._2,x._2)).
reduceByKey(_ _).take(5).foreach(println)
=>(1, аренда, 3)
(1, buy, 3)
df1.map(rec => ((rec(0), rec(2)), 1)).
map(x => (x._1._1, x._1._2,x._2)).
reduceByKey(_ _).take(5).foreach(println)
=>(1, dvd, 2)
(1, tv, 1)
(1, фильм, 3)
df1.map(rec => ((rec(0), rec(1), rec(4).toString().substring(8,10)), 1)).
reduceByKey(_ _).map(x => (x._1._1, x._1._2,x._1._3,x._2)).
take(5).foreach(println)
=>(1, аренда, 19,2)
(1, buy,20,1)
(1, buy,18,2)
(1, аренда,20,1)
Комментарии:
1. Значит, это похоже на объединение ваших подходов 1, 2 и 3?
2. Разве вы не поняли, это решения для каждого из ваших ожидаемых результатов.
3. Да, я понимаю ваши 1, 2 и 3, но не уверен в конечном результате. Нужно ли мне присоединяться к этим 3?
4. Что вам нужно для объединения? Он уже сгруппирован по идентификатору, типу и дню месяца.
5. О, теперь мне пришло в голову, это по «дню» месяца. Я взял часть даты по дате. Возможно, потребуется преобразовать в datetime и извлечь день.