Как выполнить 2 разных условия groupby в одном и том же фрейме данных в Scala?

#scala #apache-spark #dataframe #group-by #spark-dataframe

#scala #apache-spark #фрейм данных #группирование по #apache-spark-sql

Вопрос:

У меня есть фрейм данных, мне нужны две разные группы в одном фрейме данных.

  ---- ------- -------- ---------------------------- 
| id | type  | item   | value  | timestamp         |
 ---- ------- -------- ---------------------------- 
| 1 |  rent  |  dvd   |  12    |2016-09-19T00:00:00Z
| 1 |  rent  |  dvd   |  12    |2016-09-19T00:00:00Z
| 1 | buy    |  tv    |  12    |2016-09-20T00:00:00Z
| 1 |  rent  |  movie |  12    |2016-09-20T00:00:00Z
| 1 |   buy  |  movie |  12    |2016-09-18T00:00:00Z
| 1 | buy    |  movie |  12    |2016-09-18T00:00:00Z
 ---- ------- ------- ------------------------------  
  

Я хотел бы получить результат в виде :

 id : 1
totalValue  : 72 --- group by based on id
typeCount : {"rent" : 3, "buy" : 3} --- group by based on id
itemCount : {"dvd" : 2, "tv" : 1, "movie" : 3 } --- group by based on id
typeForDay : {"rent: 2, "buy" : 2 }  --- group By based on id and dayofmonth(col("timestamp"))  atmost 1 type per day 
  

Я пытался :

 val count_by_value = udf {( listValues :scala.collection.mutable.WrappedArray[String]) => if (listValues == null) null else  listValues.groupBy(identity).mapValues(_.size)}


val group1 = df.groupBy("id").agg(collect_list("type"),sum("value") as "totalValue", collect_list("item")) 

val group1Result =  group1.withColumn("typeCount", count_by_value($"collect_list(type)"))
                          .drop("collect_list(type)")
                          .withColumn("itemCount", count_by_value($"collect_list(item)"))
                          .drop("collect_list(item)")


val group2 = df.groupBy("id", dayofmonth(col("timestamp"))).agg(collect_set("type")) 

val group2Result =  group2.withColumn("typeForDay", count_by_value($"collect_set(type)"))
                          .drop("collect_set(type)")


val groupedResult = group1Result.join(group2Result, "id").show()
  

Но это требует времени, есть ли другой эффективный способ сделать это?

Ответ №1:

Лучший подход — добавить каждое групповое поле в ключ и уменьшить их вместо groupBy() . Вы можете использовать эти:

 df1.map(rec => (rec(0), rec(3).toString().toInt)).
     reduceByKey(_ _).take(5).foreach(println)
  

=> (1,72)

 df1.map(rec => ((rec(0), rec(1)), 1)).
    map(x => (x._1._1, x._1._2,x._2)).
    reduceByKey(_ _).take(5).foreach(println)
  

=>(1, аренда, 3)

(1, buy, 3)

 df1.map(rec => ((rec(0), rec(2)), 1)).
    map(x => (x._1._1, x._1._2,x._2)).
    reduceByKey(_ _).take(5).foreach(println)
  

=>(1, dvd, 2)

(1, tv, 1)

(1, фильм, 3)

 df1.map(rec => ((rec(0), rec(1), rec(4).toString().substring(8,10)), 1)).
    reduceByKey(_ _).map(x => (x._1._1, x._1._2,x._1._3,x._2)).
    take(5).foreach(println)
  

=>(1, аренда, 19,2)

(1, buy,20,1)

(1, buy,18,2)

(1, аренда,20,1)

Комментарии:

1. Значит, это похоже на объединение ваших подходов 1, 2 и 3?

2. Разве вы не поняли, это решения для каждого из ваших ожидаемых результатов.

3. Да, я понимаю ваши 1, 2 и 3, но не уверен в конечном результате. Нужно ли мне присоединяться к этим 3?

4. Что вам нужно для объединения? Он уже сгруппирован по идентификатору, типу и дню месяца.

5. О, теперь мне пришло в голову, это по «дню» месяца. Я взял часть даты по дате. Возможно, потребуется преобразовать в datetime и извлечь день.