Как заменить нули при слиянии и добавлении

#scala #apache-spark #dataframe #merge

#scala #apache-spark #фрейм данных #слияние

Вопрос:

У меня есть два фрейма данных,

 DF1
 ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |10     |20    |30    |
|  2 |20     |30    |40    |
|  3 |30     |40    |50    |
 ---- ------- ------ ------ 

DF2
 ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |100    |200   |300   |
|  2 |200    |300   |400   |
|  3 |300    |400   |500   |
 ---- ------- ------ ------ 
  

Я пытаюсь объединить эти два фрейма данных по идентификатору и добавить столбцы значений вместе. Итак, получите что-то вроде этого.

  ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |100 10 |200 20|300 30|
|  2 |200 20 |300 30|400 40|
|  3 |300 30 |400 40|500 50|
 ---- ------- ------ ------ 
  

Это отлично работает, когда я использую

 // extract the names of the columns to sum
val cols = df1.columns.filter(_!="id") 

// join and sum
val result = df1
    .join(df2,Seq("id"), "full_outer")
    .select( col("id")  : cols.map(c=>df1(c) df2(c) as c) : _*)
  

Но когда в одном из фреймов данных отсутствует один идентификатор. Например

 DF1
 ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |10     |20    |30    |
|  2 |20     |30    |40    |
|  3 |30     |40    |50    |
|  4 |40     |40    |40
 ---- ------- ------ ------ 

DF2
 ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |100    |200   |300   |
|  2 |200    |300   |400   |
|  3 |300    |400   |500   |
 ---- ------- ------ ------ 
  

После слияния я получаю следующие значения, используя операцию, о которой я упоминал выше.

  ---- ------- ------ ------ 
|id  | pnl   |value2|value3|
 ---- ------- ------ ------ 
|  1 |100 10 |200 20|300 30|
|  2 |200 20 |300 30|400 40|
|  3 |300 30 |400 40|500 50|
|  4 |null   |null  |null  |
 ---- ------- ------ ------ 
  

Я понимаю, что я получаю это, потому что у меня нет этого идентификатора в df2. Итак, один из способов, которым я думал решить эту проблему, — использовать .na.fill (0.0) после слияния

 // join and sum
val result = df1
    .join(df2,Seq("id"), "full_outer").na.fill(0.0)
    .select( col("id")  : cols.map(c=>df1(c) df2(c) as c) : _*)
  

Но spark, похоже, это не нравится, и я получаю следующую ошибку.

org.apache.spark.sql.AnalysisException: Ссылка ‘pnl’ неоднозначна, может быть: pnl, pnl.;

Кто-нибудь знает обходной путь для этого? Спасибо

Ответ №1:

Вы могли бы использовать cols.map(c => coalesce(df1(c), lit(0)) coalesce(df2(c), lit(0)) as c)

Комментарии:

1. Пробовал. Можете ли вы передать 0 в качестве второго аргумента в функцию объединения? Запрашивается столбец

2. Пожалуйста, используйте lit(0)