В spark как использовать спецификацию окна с агрегатными функциями

#scala #apache-spark #apache-spark-sql

#скала #apache-spark #apache-spark-sql

Вопрос:

У меня есть фрейм данных spark, который выглядит как:

  ------------ --------- --------------------------------------------------------------------------------------------------------- 
|parent_key  |id       |value                                                       |raw_is_active           |updated_at         |
 ------------ --------- ------------------------------------------------------------ ------------------------ ------------------- 
|1           |2        |[, 0, USER, 2020-12-11 04:50:40, 2020-12-11 04:50:40,]      |[2020-12-11 04:50:40, 0]|2020-12-11 04:50:40|
|1           |2        |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:18:00,] |null                    |2020-12-11 17:18:00|
|1           |2        |[testA, 0, USER, 2020-12-11 04:50:40, 2020-12-11 17:19:56,] |null                    |2020-12-11 17:19:56|
|1           |2        |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24,] |[2020-12-11 17:20:24, 1]|2020-12-11 17:20:24|
|2           |3        |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03,] |[2020-12-11 17:24:03, 0]|2020-12-11 17:24:03|
|3           |4        |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36,] |[2020-12-11 17:27:36, 0]|2020-12-11 17:27:36|
 ------------ --------- ------------------------------------------------------------ ------------------------ ------------------- 
 

Схема:

 root
 |-- parent_key: long (nullable = true)
 |-- id: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- is_active: integer (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- created_at: timestamp (nullable = true)
 |    |-- updated_at: timestamp (nullable = true)
 |-- raw_is_active: struct (nullable = true)
 |    |-- updated_at: timestamp (nullable = true)
 |    |-- value: integer (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 

Я ищу результат:

  ------------ --------- ------------------------------------------------------------ --------------------------------------------------- ------------------- 
|parent_key  |id       |value                                                       |raw_is_active                                      |updated_at         |
 ------------ --------- --------------------------------------------------------------------------------------------------------- -------------------------- 
|1           |2        |[testA, 1, USER, 2020-12-11 04:50:40, 2020-12-11 17:20:24]  |[[2020-12-11 04:50:40, 0],[2020-12-11 17:20:24, 1]]|2020-12-11 04:50:40|
|2           |3        |[testB, 0, USER, 2020-12-11 17:24:03, 2020-12-11 17:24:03]  |[2020-12-11 17:24:03, 0]                           |2020-12-11 17:24:03|
|3           |4        |[testC, 0, USER, 2020-12-11 17:27:36, 2020-12-11 17:27:36]  |[2020-12-11 17:27:36, 0]                           |2020-12-11 17:27:36|
 ------------ --------- --------------------------------------------------------------------------------------------------------- -------------------------- 
 

Итак, на основе updated_at столбца я хочу сохранить последнюю строку, а также создать массив raw_is_active для всех строк для данного id .

Я знаю, что могу выбрать последнюю value версию, используя код:

  val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)

    dataFrame
      .withColumn("maxTS", first("updated_at").over(windowSpec))
      .select("*").where(col("maxTS") === col("updated_at"))
      .drop("maxTS")
 

Но не уверен, как я могу также создать набор для raw_is_active столбца.

Или я могу полностью использовать группировку по функциям, например:

  dataFrame
      .groupBy("parent_key", "id")
      .agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
      .withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
      .drop("value_list")
 

Для вышеупомянутого я не уверен

  1. .withColumn("value", col("value_list")(size(col("value_list")).minus(1))) всегда будет давать мне последнее значение
  2. Учитывая использование collect_list и collect_set , эффективен ли этот код?

ОБНОВЛЕНИЕ Благодаря @mck я смог заставить его работать с кодом:

 val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)
val windowSpecSet = Window.partitionBy("id").orderBy(col("updated_at"))

val df2 = dataFrame.withColumn(
    "rn",
    row_number().over(windowSpec)
).withColumn(
    "active_list",
    collect_set("raw_is_active").over(windowSpecSet)
).drop("raw_is_active").filter("rn = 1")
 

Однако код занимает больше времени, чем мой существующий код:

  dataFrame
      .groupBy("parent_key", "id")
      .agg(collect_list("value") as "value_list", collect_set("raw_is_active") as "active_list")
      .withColumn("value", col("value_list")(size(col("value_list")).minus(1)))
      .drop("value_list")
 

У меня сложилось впечатление , что функция window будет работать лучше , чем groupBy and agg .

Ответ №1:

Назначьте a row_number для каждой строки в каждом разделе идентификаторов и отфильтруйте строки с помощью row_number = 1 :

 val windowSpec = Window.partitionBy("id").orderBy(col("updated_at").desc)

val df2 = dataFrame.withColumn(
    "rn",
    row_number().over(windowSpec)
).withColumn(
    "active_list",
    array_sort(collect_set("raw_is_active").over(windowSpec))
).drop("raw_is_active").filter("rn = 1")

 

Комментарии:

1. спасибо за помощь. Код не работает для меня. В то время как последний value столбец поступает с rn = 1, набор значений active_list достигает максимума (rn)

2. Я смог заставить его работать, используя две спецификации. Обновлено описание моего вопроса.

3. collect_set не гарантирует порядок, независимо от того, как вы указываете окно. В этом небольшом случае это просто происходит в определенном порядке, потому что вы не выполняете в кластере. Если они вам нужны в определенном порядке, используйте array_sort . Смотрите Документы по адресу spark.apache.org/docs/latest/api/java/org/apache/spark/sql /…

4. этот метод может быть немного медленнее, чем ваш метод, но он потребляет меньше памяти, поскольку не требует хранения всех value в списке. Если их много value , вы можете столкнуться с ошибками памяти в spark.

5. проблема не в порядке сбора. Коллекция устанавливается rn = 4 вместо rn = 1 . Чтобы заставить это работать, необходимо использовать другую спецификацию WindowSpec, которая может вызывать проблемы с производительностью. Ищете способ сделать это с помощью одной спецификации