#scala #apache-spark #apache-spark-sql
#скала #apache-spark #apache-spark-sql
Вопрос:
У меня есть фрейм данных spark, который выглядит как:
------------ --------- ---------------------------------------------------------------------------------------------------------
|parent_key |id |value |raw_is_active |updated_at |
------------ --------- ------------------------------------------------------------ ------------------------ -------------------
|1 |2 |[, 0, USER, 2020-12-11 04:50:40, 2020-12-11 04:50:40,] |[2020-12-11 04:50:40, 0]|2020-12-11 04:50:40|
|1 |2 |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:18:00,] |null |2020-12-11 17:18:00|
|1 |2 |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:19:56,] |null |2020-12-11 17:19:56|
|1 |2 |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24,] |[2020-12-11 17:20:24, 1]|2020-12-11 17:20:24|
|2 |3 |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03,] |[2020-12-11 17:24:03, 0]|2020-12-11 17:24:03|
|3 |4 |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36,] |[2020-12-11 17:27:36, 0]|2020-12-11 17:27:36|
------------ --------- ------------------------------------------------------------ ------------------------ -------------------
Схема:
root
|-- parent_key: long (nullable = true)
|-- id: string (nullable = true)
|-- value: struct (nullable = true)
| |-- first_name: string (nullable = true)
| |-- is_active: integer (nullable = true)
| |-- source: string (nullable = true)
| |-- created_at: timestamp (nullable = true)
| |-- updated_at: timestamp (nullable = true)
|-- raw_is_active: struct (nullable = true)
| |-- updated_at: timestamp (nullable = true)
| |-- value: integer (nullable = true)
|-- updated_at: timestamp (nullable = true)
Я ищу результат:
------------ --------- ------------------------------------------------------------ --------------------------------------------------- -------------------
|parent_key |id |value |raw_is_active |updated_at |
------------ --------- --------------------------------------------------------------------------------------------------------- --------------------------
|1 |2 |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24] |[[2020-12-11 04:50:40, 0],[2020-12-11 17:20:24, 1]]|2020-12-11 04:50:40|
|2 |3 |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03] |[2020-12-11 17:24:03, 0] |2020-12-11 17:24:03|
|3 |4 |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36] |[2020-12-11 17:27:36, 0] |2020-12-11 17:27:36|
------------ --------- --------------------------------------------------------------------------------------------------------- --------------------------
Итак, на основе updated_at
столбца я хочу сохранить последнюю строку, а также создать массив raw_is_active
для всех строк для данного id
.
Я знаю, что могу выбрать последнюю value
версию, используя код:
val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
dataFrame
.withColumn("maxTS", first("updated_at").over(windowSpec))
.select("*").where(col("maxTS") === col("updated_at"))
.drop("maxTS")
Но не уверен, как я могу также создать набор для raw_is_active
столбца.
Или я могу полностью использовать группировку по функциям, например:
dataFrame
.groupBy("parent_key", "id")
.agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
.withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
.drop("value_list")
Для вышеупомянутого я не уверен
.withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
всегда будет давать мне последнее значение- Учитывая использование
collect_list
иcollect_set
, эффективен ли этот код?
ОБНОВЛЕНИЕ Благодаря @mck я смог заставить его работать с кодом:
val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
val windowSpecSet = Window.partitionBy("id").orderBy(col("updated_at"))
val df2 = dataFrame.withColumn(
"rn",
row_number().over(windowSpec)
).withColumn(
"active_list",
collect_set("raw_is_active").over(windowSpecSet)
).drop("raw_is_active").filter("rn = 1")
Однако код занимает больше времени, чем мой существующий код:
dataFrame
.groupBy("parent_key", "id")
.agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
.withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
.drop("value_list")
У меня сложилось впечатление , что функция window будет работать лучше , чем groupBy
and agg
.
Ответ №1:
Назначьте a row_number
для каждой строки в каждом разделе идентификаторов и отфильтруйте строки с помощью row_number = 1
:
val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
val df2 = dataFrame.withColumn(
"rn",
row_number().over(windowSpec)
).withColumn(
"active_list",
array_sort(collect_set("raw_is_active").over(windowSpec))
).drop("raw_is_active").filter("rn = 1")
Комментарии:
1. спасибо за помощь. Код не работает для меня. В то время как последний
value
столбец поступает с rn = 1, набор значенийactive_list
достигает максимума (rn)2. Я смог заставить его работать, используя две спецификации. Обновлено описание моего вопроса.
3.
collect_set
не гарантирует порядок, независимо от того, как вы указываете окно. В этом небольшом случае это просто происходит в определенном порядке, потому что вы не выполняете в кластере. Если они вам нужны в определенном порядке, используйтеarray_sort
. Смотрите Документы по адресу spark.apache.org/docs/latest/api/java/org/apache/spark/sql /…4. этот метод может быть немного медленнее, чем ваш метод, но он потребляет меньше памяти, поскольку не требует хранения всех
value
в списке. Если их многоvalue
, вы можете столкнуться с ошибками памяти в spark.5. проблема не в порядке сбора. Коллекция устанавливается
rn = 4
вместоrn = 1
. Чтобы заставить это работать, необходимо использовать другую спецификацию WindowSpec, которая может вызывать проблемы с производительностью. Ищете способ сделать это с помощью одной спецификации