как создать фрейм данных на основе даты первого появления и на основе дополнительных столбцов в каждом столбце id

#scala #apache-spark-sql

#scala #apache-spark-sql

Вопрос:

я пытаюсь создать фрейм данных со следующим условием: у меня есть несколько идентификаторов, несколько столбцов со значениями по умолчанию (0 или 1) и столбец startdate. Я хотел бы получить фрейм данных с отображаемыми значениями по умолчанию на основе первой начальной даты (default_date) и каждого идентификатора.

исходный df выглядит следующим образом:

  ---- ----- ----- ----- ----------- 
|id  |def_a|def_b|deb_c|date       |
 ---- ----- ----- ----- ----------- 
|  01|    1|    0|    1| 2019-01-31|
|  02|    1|    1|    0| 2018-12-31|
|  03|    1|    1|    1| 2018-10-31|
|  01|    1|    0|    1| 2018-09-30|
|  02|    1|    1|    0| 2018-08-31|
|  03|    1|    1|    0| 2018-07-31|
|  03|    1|    1|    1| 2019-05-31|
  

вот как я хотел бы это иметь:

  ---- ----- ----- ----- ----------- 
|id  |def_a|def_b|deb_c|date       |
 ---- ----- ----- ----- ----------- 
|  01|    1|    0|    1| 2018-09-30|
|  02|    1|    1|    0| 2018-08-31|
|  03|    1|    1|    1| 2018-07-31|
  

я попробовал следующий код:

 val w = Window.partitionBy($"id").orderBy($"date".asc) 
val reult = join3.withColumn("rn", row_number.over(w)).where($"def_a" === 1 || $"def_b" === 1 ||$"def_c" === 1).filter($"rn" >= 1).drop("rn")
  

результат.показать

Я был бы благодарен за любую помощь

Комментарии:

1. Привет, @Nika ты решил эту проблему?

2. Привет, пока нет. это не совсем тот результат, который мне нужен

3. Привет, Ника, так какой именно результат тебе нужен?

4. Я думал, вы ищете для каждой группы идентификаторов запись с минимальным значением даты, не так ли?

5. Извините, у меня была другая ошибка в моем коде. ваш код помог мне. Спасибо!

Ответ №1:

Это должно сработать для вас. Сначала назначьте минимальную дату исходному df, затем присоедините новый df2 к df.

 import org.apache.spark.sql.expressions.Window

val df = Seq(
(1,1,0,1,"2019-01-31"),
(2,1,1,0,"2018-12-31"),
(3,1,1,1,"2018-10-31"),
(1,1,0,1,"2018-09-30"),
(2,1,1,0,"2018-08-31"),
(3,1,1,0,"2018-07-31"),
(3,1,1,1,"2019-05-31"))
.toDF("id"  ,"def_a" , "def_b", "deb_c", "date")

val w = Window.partitionBy($"id").orderBy($"date".asc) 

val df2 = df.withColumn("date", $"date".cast("date"))
            .withColumn("min_date", min($"date").over(w))
            .select("id", "min_date")
            .distinct()

df.join(df2, df("id") === df2("id") amp;amp; df("date") === df2("min_date"))
.select(df("*"))
.show
  

И результат должен быть:

  --- ----- ----- ----- ---------- 
| id|def_a|def_b|deb_c|      date|
 --- ----- ----- ----- ---------- 
|  1|    1|    0|    1|2018-09-30|
|  2|    1|    1|    0|2018-08-31|
|  3|    1|    1|    0|2018-07-31|
 --- ----- ----- ----- ---------- 
  

Кстати, я полагаю, что у вас была небольшая ошибка в ваших ожидаемых результатах. Это (3, 1, 1, 0, 2018-07-31) не (3, 1, 1, 1, 2018-07-31)