Scala Spark создает новый столбец в фрейме данных на основе совокупного количества значений в другом столбце

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть фрейм данных spark, подобный приведенному ниже

  ----- ---------- ---------- 
|   ID|      date|  count   |
 ----- ---------- ---------- 
|54500|2016-05-02|         0|
|54500|2016-05-09|         0|
|54500|2016-05-16|         0|
|54500|2016-05-23|         0|
|54500|2016-06-06|         0|
|54500|2016-06-13|         0|
|54441|2016-06-20|         0|
|54441|2016-06-27|         0|
|54441|2016-07-04|         0|
|54441|2016-07-11|         0|
 ----- ---------- ---------- 
  

Я хочу добавить дополнительный столбец, содержащий количество записей для определенного идентификатора в фрейме данных, избегая цикла for . Целевой фрейм данных выглядит следующим образом

  ----- ---------- ---------- 
|   ID|      date|  count   |
 ----- ---------- ---------- 
|54500|2016-05-02|         6|
|54500|2016-05-09|         6|
|54500|2016-05-16|         6|
|54500|2016-05-23|         6|
|54500|2016-06-06|         6|
|54500|2016-06-13|         6|
|54441|2016-06-20|         4|
|54441|2016-06-27|         4|
|54441|2016-07-04|         4|
|54441|2016-07-11|         4|
 ----- ---------- ---------- 
  

Пробовал это

 import org.apache.spark.sql.expressions.Window

var s = Window.partitionBy("ID")
var df2 = df.withColumn("count", count.over(s))
  

это выдает ошибку

 error: ambiguous reference to overloaded definition,
both method count in object functions of type (columnName: String)org.apache.spark.sql.TypedColumn[Any,Long]
and  method count in object functions of type (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
match expected type ?
  

Комментарии:

1. Окно Spark — это то, что вам нужно

2. сделал это, я получаю ошибку

3. @Leothorn не могли бы вы предоставить подробную информацию об ошибке

4. Я также добавил ошибку

5. попробуйте count(df(«count»))

Ответ №1:

Следуйте приведенному ниже подходу:

  import spark.implicits._

val df1 = List(54500, 54500, 54500, 54500, 54500, 54500, 54441, 54441, 54441, 54441).toDF("ID")
val df2 = df1.groupBy("ID").count()
df1.join(df2, Seq("ID"), "left").show(false)

 ----- ----- 
|ID   |count|
 ----- ----- 
|54500|6    |
|54500|6    |
|54500|6    |
|54500|6    |
|54500|6    |
|54500|6    |
|54441|4    |
|54441|4    |
|54441|4    |
|54441|4    |
 ----- ----- 
  

Комментарии:

1. Ответ неверен, потому что вы уменьшаете количество строк в конечном фрейме данных.

2. О! извините, я этого не проверял. отредактировал сообщение, пожалуйста, посмотрите.