Обновление некоторых значений строк в искровом фрейме данных

#scala #apache-spark #dataframe #apache-spark-sql

#scala #apache-spark #фрейм данных #apache-spark-sql

Вопрос:

У меня есть фрейм данных, который я хочу объединить с другим фреймом данных, но только для воздействия на определенные ячейки, а не на целую строку.

Старый фрейм данных:

 ##  --- ---- ---- 
## |key|val1|val2|
##  --- ---- ---- 
## |  1|  aa|  ab|
## |  2|  bb|  bc|
##  --- ---- ---- 
  

Новый фрейм данных:

 ##  --- ---- 
## |key|val1|
##  --- ---- 
## |  2| bbb|
##  --- ---- 
  

Результат:

 ##  --- ---- ---- 
## |key|val1|val2|
##  --- ---- ---- 
## |  1|  aa|  ab|
## |  2| bbb|  bc|
##  --- ---- ---- 
  

В этом случае ключ уникален, поэтому строка, на которую будет оказано влияние, всегда будет идентифицироваться. Старый фрейм данных также всегда будет содержать ключи из нового фрейма данных.

Поскольку фреймы данных неизменяемы, мне придется вызвать withColumn , чтобы создать новый, предположительно, передав какой-то UDF, но я немного теряюсь, когда дело доходит до того, что должен содержать этот UDF.

Ответ №1:

Вам нужно использовать внешнее соединение, чтобы получить ожидаемый результат :

 scala> val oldDf = Seq((1, "aa", "ab"), (2, "bb", "bc")).toDF("key", "val1", "val2").as("old")
// oldDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string ... 1 more field]
scala> val newDf = Seq((2, "bbb")).toDF("key", "val1").as("new")
// newDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string]

scala> oldDf.join(newDf, Seq("key"), "outer").select($"key", coalesce($"new.val1", $"old.val1").alias("val1"), $"val2").show
//  --- ---- ---- 
// |key|val1|val2|
//  --- ---- ---- 
// |  1|  aa|  ab| 
// |  2| bbb|  bc|
//  --- ---- ---- 
  

Примечание: coalesce будет выбрано первое ненулевое значение между new.val1 и old.val1 .

Комментарии:

1. что, если я хочу суммировать два значения из newdf, могу ли я просто добавить оба столбца в объединение??

2. что вы подразумеваете под «первым ненулевым значением»??

3. @SundeepPidugu Это означает, что если new.val1 значение не равно null , значение для key будет new.val1 , в противном случае, если old.val1 не равно null , значение для key будет old.val1 , в противном случае значение key будет null . Это базовый диалект SQL.

4. Итак, я хочу заменить столбец суммой столбцов в новом Df, затем я должен передать его в качестве первого параметра в coalesce, чтобы отразить значения, верно?

5. Для меня это работает как по волшебству! ^^ Хотя в Databricks мне приходилось вызывать столбцы следующим образом: oldFf(«val1»)