Как получить значение из предыдущей группы в spark?

#scala #apache-spark #apache-spark-sql

Вопрос:

Мне нужно получить значение предыдущей группы в spark и установить его в текущую группу. Как я могу этого достичь? Я должен заказывать по количеству вместо TEXT_NUM.

Упорядочение по TEXT_NUM невозможно, поскольку события повторяются во времени, как показано на графах 10 и 11.

Я пытаюсь использовать следующий код:

    val spark = SparkSession.builder()
      .master("spark://spark-master:7077")
      .getOrCreate()

    val df = spark
      .createDataFrame(
        Seq[(Int, String, Int)](
          (0, "", 0),
          (1, "", 0),
          (2, "A", 1),
          (3, "A", 1),
          (4, "A", 1),
          (5, "B", 2),
          (6, "B", 2),
          (7, "B", 2),
          (8, "C", 3),
          (9, "C", 3),
          (10, "A", 1),
          (11, "A", 1)
        ))
      .toDF("count", "TEXT", "TEXT_NUM")

    val w1 = Window
      .orderBy("count")
      .rangeBetween(Window.unboundedPreceding, -1)
    df
      .withColumn("LAST_VALUE", last("TEXT_NUM").over(w1))
      .orderBy("count")
      .show()
 

Результат:

  ----- ---- -------- ---------- 
|count|TEXT|TEXT_NUM|LAST_VALUE|
 ----- ---- -------- ---------- 
|    0|    |       0|      null|
|    1|    |       0|         0|
|    2|   A|       1|         0|
|    3|   A|       1|         1|
|    4|   A|       1|         1|
|    5|   B|       2|         1|
|    6|   B|       2|         2|
|    7|   B|       2|         2|
|    8|   C|       3|         2|
|    9|   C|       3|         3|
|   10|   A|       1|         3|
|   11|   A|       1|         1|
 ----- ---- -------- ---------- 
 

Желаемый результат:

  ----- ---- -------- ---------- 
|count|TEXT|TEXT_NUM|LAST_VALUE|
 ----- ---- -------- ---------- 
|    0|    |       0|      null|
|    1|    |       0|      null|
|    2|   A|       1|         0|
|    3|   A|       1|         0|
|    4|   A|       1|         0|
|    5|   B|       2|         1|
|    6|   B|       2|         1|
|    7|   B|       2|         1|
|    8|   C|       3|         2|
|    9|   C|       3|         2|
|   10|   A|       1|         3|
|   11|   A|       1|         3|
 ----- ---- -------- ---------- 
 

Ответ №1:

Рассмотрите возможность использования функции окна last(columnName, ignoreNulls) для заполнения null столбцов в столбце, состоящем из предыдущего «text_num» на границах групп, как показано ниже:

 val df = Seq(
  (0, "", 0), (1, "", 0),
  (2, "A", 1), (3, "A", 1), (4, "A", 1),
  (5, "B", 2), (6, "B", 2), (7, "B", 2),
  (8, "C", 3), (9, "C", 3),
  (10, "A", 1), (11, "A", 1)
).toDF("count", "text", "text_num")

import org.apache.spark.sql.expressions.Window
val w1 = Window.orderBy("count")
val w2 = w1.rowsBetween(Window.unboundedPreceding, 0)

df.
  withColumn("prev_num", lag("text_num", 1).over(w1)).
  withColumn("last_change", when($"text_num" =!= $"prev_num", $"prev_num")).
  withColumn("last_value", last("last_change", ignoreNulls=true).over(w2)).
  show
/*
 ----- ---- -------- -------- ----------- ---------- 
|count|text|text_num|prev_num|last_change|last_value|
 ----- ---- -------- -------- ----------- ---------- 
|    0|    |       0|    null|       null|      null|
|    1|    |       0|       0|       null|      null|
|    2|   A|       1|       0|          0|         0|
|    3|   A|       1|       1|       null|         0|
|    4|   A|       1|       1|       null|         0|
|    5|   B|       2|       1|          1|         1|
|    6|   B|       2|       2|       null|         1|
|    7|   B|       2|       2|       null|         1|
|    8|   C|       3|       2|          2|         2|
|    9|   C|       3|       3|       null|         2|
|   10|   A|       1|       3|          3|         3|
|   11|   A|       1|       1|       null|         3|
 ----- ---- -------- -------- ----------- ---------- 
*/
 

Промежуточные столбцы сохраняются в выходных данных для ссылок. Просто бросьте их, если они не нужны.