#scala #apache-spark #dataframe #apache-spark-sql
#scala #apache-spark #dataframe #apache-spark-sql
Вопрос:
У меня есть spark dataframe mainDF
и deltaDF
оба с соответствующей схемой.
Содержимое mainDF выглядит следующим образом:
id | name | age
1 | abc | 23
2 | xyz | 34
3 | pqr | 45
Содержимое deltaDF
выглядит следующим образом:
id | name | age
1 | lmn | 56
4 | efg | 37
Я хочу объединить deltaDF
с mainDF
на основе значения id
. Итак, если my id
уже существует в mainDF
, то запись должна быть обновлена, а если id
не существует, то следует добавить новую запись. Итак, результирующий фрейм данных должен быть таким:
id | name | age
1 | lmn | 56
2 | xyz | 34
3 | pqr | 45
4 | efg | 37
Это мой текущий код, и он работает:
val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age")
mainDF= mainDF.except(updateDF).unionAll(deltaDF)
Однако здесь мне нужно снова явно указать столбцы списка в функции выбора, которая мне кажется накладной. Есть ли какой-либо другой лучший / более чистый подход для достижения того же?
Комментарии:
1. вы пробовали
join()
?2. На самом деле это не соответствует вашему вопросу, но вы также можете выполнить объединение с меткой времени (фиктивной, если у вас ее нет) и удалить строки с дублирующимся идентификатором и более старой меткой времени.
3. почему вы не использовали объединение?
4. Я использую объединение. Забыл добавить этот код.
Ответ №1:
Если вы не хотите предоставлять список столбцов явно, вы можете отобразить исходные столбцы DF, что-то вроде:
.select(mainDF.columns.map(c => $"main.$c" as c): _*)
Кстати, вы можете сделать это без union
после join
: вы можете использовать outer
join для получения записей, которые не существуют в обоих DFS, а затем использовать coalesce
для «выбора» ненулевого значения, предпочитающего значения deltaDF
. Таким образом, полное решение было бы чем-то вроде:
val updatedDF = mainDF.as("main")
.join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer")
.select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*)
updatedDF.show
// --- ---- ---
// | id|name|age|
// --- ---- ---
// | 1| lmn| 56|
// | 3| pqr| 45|
// | 4| efg| 37|
// | 2| xyz| 34|
// --- ---- ---
Ответ №2:
Вы можете достичь этого, используя dropDuplicates
и указав, в каком столбце вы не хотите никаких дубликатов.
Вот рабочий код :
val a = (1,"lmn",56)::(2,"abc",23)::(3,"pqr",45)::Nil
val b = (1,"opq",12)::(5,"dfg",78)::Nil
val df1 = sc.parallelize(a).toDF
val df2 = sc.parallelize(b).toDF
df1.unionAll(df2).dropDuplicates("_1"::Nil).show()
--- --- ---
| _1| _2| _3|
--- --- ---
| 1|lmn| 56|
| 2|abc| 23|
| 3|pqr| 45|
| 5|dfg| 78|
--- --- ---
Комментарии:
1. Это не работает, потому что случайным образом удаляется одна повторяющаяся запись, в сценарии слияния должна быть выбрана последняя запись, игнорирующая старую
Ответ №3:
Другой способ сделать это: реализация pyspark
updatedDF = mainDF.alias(“main”).join(deltaDF.alias(“delta”), main.id == delta.id,"left")
upsertDF = updatedDF.where(“main.id IS not null").select("main.*")
unchangedDF = updatedDF.where(“main.id IS NULL”).select("delta.*")
finalDF = upsertDF.union(unchangedDF)