Scala Spark: как дополнить подсписок внутри фрейма данных дополнительными значениями?

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Допустим, у меня есть фрейм данных, originalDF который выглядит следующим образом

  -------- -------------- 
|data_id |data_list     |
 -------- -------------- 
|       3| [a, b, d]    |
|       2|[c, a, b, e]  |
|       1|         [g]  |
 -------- -------------- 
  

И у меня есть другой фрейм данных, extraInfoDF который выглядит следующим образом:

  -------- -------------- 
|data_id |data_list     |
 -------- -------------- 
|       3| [q, w, x, a] |
|       2|[r, q, l, p]  |
|       1| [z, k, j, f] |
 -------- -------------- 
  

Для двух data_lists входов originalDF , которые короче 4, я хочу добавить данные из соответствующего data_lists входа extraInfoDF , чтобы каждый список имел длину 4.

Результирующий фрейм данных будет выглядеть следующим образом:

  -------- -------------- 
|data_id |data_list     |
 -------- -------------- 
|       3| [a, b, d, q] |
|       2|[c, a, b, e]  |
|       1|[g, z, k, j]  |
 -------- -------------- 
  

Я пытался найти какой-нибудь способ перебирать каждую строку в фрейме данных и добавлять к списку таким образом, но возникли проблемы. Теперь мне интересно, есть ли более простой способ добиться этого с помощью UDF?

Комментарии:

1. Вы можете сначала объединить их, а затем изменить его размер.

2. Если я объединю их, порядок останется прежним? Итак, [c, a, b, e] все равно будет первым 4 для data_id 2, когда я уменьшу его размер до 4?

Ответ №1:

Вы можете добавить второй список к первому и take самым левым N элементам в UDF, как показано ниже:

 import org.apache.spark.sql.functions._
import spark.implicits._

def padList(n: Int) = udf{ (l1: Seq[String], l2: Seq[String]) =>
  (l1    l2).take(n)
}

val df1 = Seq(
  (3, Seq("a", "b", "d")),
  (2, Seq("c", "a", "b", "e")),
  (1, Seq("g"))
).toDF("data_id", "data_list")

val df2 = Seq(
  (3, Seq("q", "w", "x", "a")),
  (2, Seq("r", "q", "l", "p")),
  (1, Seq("z", "k", "j", "f"))
).toDF("data_id", "data_list")

df1.
  join(df2, "data_id").
  select($"data_id", padList(4)(df1("data_list"), df2("data_list")).as("data_list")).
  show
//  ------- ------------ 
// |data_id|   data_list|
//  ------- ------------ 
// |      3|[a, b, d, q]|
// |      2|[c, a, b, e]|
// |      1|[g, z, k, j]|
//  ------- ------------