#scala #apache-spark-sql
#scala #apache-spark-sql
Вопрос:
я пытаюсь создать фрейм данных со следующим условием: у меня есть несколько идентификаторов, несколько столбцов со значениями по умолчанию (0 или 1) и столбец startdate. Я хотел бы получить фрейм данных с отображаемыми значениями по умолчанию на основе первой начальной даты (default_date) и каждого идентификатора.
исходный df выглядит следующим образом:
---- ----- ----- ----- -----------
|id |def_a|def_b|deb_c|date |
---- ----- ----- ----- -----------
| 01| 1| 0| 1| 2019-01-31|
| 02| 1| 1| 0| 2018-12-31|
| 03| 1| 1| 1| 2018-10-31|
| 01| 1| 0| 1| 2018-09-30|
| 02| 1| 1| 0| 2018-08-31|
| 03| 1| 1| 0| 2018-07-31|
| 03| 1| 1| 1| 2019-05-31|
вот как я хотел бы это иметь:
---- ----- ----- ----- -----------
|id |def_a|def_b|deb_c|date |
---- ----- ----- ----- -----------
| 01| 1| 0| 1| 2018-09-30|
| 02| 1| 1| 0| 2018-08-31|
| 03| 1| 1| 1| 2018-07-31|
я попробовал следующий код:
val w = Window.partitionBy($"id").orderBy($"date".asc)
val reult = join3.withColumn("rn", row_number.over(w)).where($"def_a" === 1 || $"def_b" === 1 ||$"def_c" === 1).filter($"rn" >= 1).drop("rn")
результат.показать
Я был бы благодарен за любую помощь
Комментарии:
1. Привет, @Nika ты решил эту проблему?
2. Привет, пока нет. это не совсем тот результат, который мне нужен
3. Привет, Ника, так какой именно результат тебе нужен?
4. Я думал, вы ищете для каждой группы идентификаторов запись с минимальным значением даты, не так ли?
5. Извините, у меня была другая ошибка в моем коде. ваш код помог мне. Спасибо!
Ответ №1:
Это должно сработать для вас. Сначала назначьте минимальную дату исходному df, затем присоедините новый df2 к df.
import org.apache.spark.sql.expressions.Window
val df = Seq(
(1,1,0,1,"2019-01-31"),
(2,1,1,0,"2018-12-31"),
(3,1,1,1,"2018-10-31"),
(1,1,0,1,"2018-09-30"),
(2,1,1,0,"2018-08-31"),
(3,1,1,0,"2018-07-31"),
(3,1,1,1,"2019-05-31"))
.toDF("id" ,"def_a" , "def_b", "deb_c", "date")
val w = Window.partitionBy($"id").orderBy($"date".asc)
val df2 = df.withColumn("date", $"date".cast("date"))
.withColumn("min_date", min($"date").over(w))
.select("id", "min_date")
.distinct()
df.join(df2, df("id") === df2("id") amp;amp; df("date") === df2("min_date"))
.select(df("*"))
.show
И результат должен быть:
--- ----- ----- ----- ----------
| id|def_a|def_b|deb_c| date|
--- ----- ----- ----- ----------
| 1| 1| 0| 1|2018-09-30|
| 2| 1| 1| 0|2018-08-31|
| 3| 1| 1| 0|2018-07-31|
--- ----- ----- ----- ----------
Кстати, я полагаю, что у вас была небольшая ошибка в ваших ожидаемых результатах. Это (3, 1, 1, 0, 2018-07-31)
не (3, 1, 1, 1, 2018-07-31)