Как выполнить операцию слияния в spark Dataframe?

#scala #apache-spark #dataframe #apache-spark-sql

#scala #apache-spark #dataframe #apache-spark-sql

Вопрос:

У меня есть spark dataframe mainDF и deltaDF оба с соответствующей схемой.

Содержимое mainDF выглядит следующим образом:

 id | name | age
1  | abc  | 23
2  | xyz  | 34
3  | pqr  | 45
  

Содержимое deltaDF выглядит следующим образом:

 id | name | age
1  | lmn  | 56
4  | efg  | 37
  

Я хочу объединить deltaDF с mainDF на основе значения id . Итак, если my id уже существует в mainDF , то запись должна быть обновлена, а если id не существует, то следует добавить новую запись. Итак, результирующий фрейм данных должен быть таким:

 id | name | age
1  | lmn  | 56
2  | xyz  | 34
3  | pqr  | 45
4  | efg  | 37
  

Это мой текущий код, и он работает:

   val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
 mainDF= mainDF.except(updateDF).unionAll(deltaDF)
  

Однако здесь мне нужно снова явно указать столбцы списка в функции выбора, которая мне кажется накладной. Есть ли какой-либо другой лучший / более чистый подход для достижения того же?

Комментарии:

1. вы пробовали join() ?

2. На самом деле это не соответствует вашему вопросу, но вы также можете выполнить объединение с меткой времени (фиктивной, если у вас ее нет) и удалить строки с дублирующимся идентификатором и более старой меткой времени.

3. почему вы не использовали объединение?

4. Я использую объединение. Забыл добавить этот код.

Ответ №1:

Если вы не хотите предоставлять список столбцов явно, вы можете отобразить исходные столбцы DF, что-то вроде:

 .select(mainDF.columns.map(c => $"main.$c" as c): _*)
  

Кстати, вы можете сделать это без union после join : вы можете использовать outer join для получения записей, которые не существуют в обоих DFS, а затем использовать coalesce для «выбора» ненулевого значения, предпочитающего значения deltaDF . Таким образом, полное решение было бы чем-то вроде:

 val updatedDF = mainDF.as("main")
  .join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
  .select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)

updatedDF.show
//  --- ---- --- 
// | id|name|age|
//  --- ---- --- 
// |  1| lmn| 56|
// |  3| pqr| 45|
// |  4| efg| 37|
// |  2| xyz| 34|
//  --- ---- --- 
  

Ответ №2:

Вы можете достичь этого, используя dropDuplicates и указав, в каком столбце вы не хотите никаких дубликатов.

Вот рабочий код :

  val a = (1,"lmn",56)::(2,"abc",23)::(3,"pqr",45)::Nil
 val b = (1,"opq",12)::(5,"dfg",78)::Nil

 val df1 = sc.parallelize(a).toDF
 val df2 = sc.parallelize(b).toDF

 df1.unionAll(df2).dropDuplicates("_1"::Nil).show()

 --- --- --- 
| _1| _2| _3|
 --- --- --- 
|  1|lmn| 56|
|  2|abc| 23|
|  3|pqr| 45|
|  5|dfg| 78|
 --- --- --- 
  

Комментарии:

1. Это не работает, потому что случайным образом удаляется одна повторяющаяся запись, в сценарии слияния должна быть выбрана последняя запись, игнорирующая старую

Ответ №3:

Другой способ сделать это: реализация pyspark

 updatedDF = mainDF.alias(“main”).join(deltaDF.alias(“delta”), main.id == delta.id,"left")
upsertDF = updatedDF.where(“main.id IS not null").select("main.*")
unchangedDF = updatedDF.where(“main.id IS NULL”).select("delta.*")
finalDF = upsertDF.union(unchangedDF)