гибкое условие соединения в Spark (Scala)

#scala #apache-spark #join

#scala #apache-spark #Присоединиться

Вопрос:

Я хочу иметь гибкое условие соединения, которое можно передать, например, в виде строки (или любого другого предложения?). Например, в следующей инструкции выражение FLEXIBLE_CONDITION может меняться при разных запусках.

 val df3 = df1.join(df2, FLEXIBLE_CONDITION, "fullouter")
  

Несколько примеров:

  (1) df1(s"query") === df2 (s"query_df2") 
 (2) df1(s"id") === df2(s"id_df2") amp;amp; df1(s"item") === df2(s"item_df2")
 (3) Or combination of (1) and (2) or any other condition
  

Необходимо отметить, что имена столбцов, на основе которых будет выполняться объединение, отличаются. Например, в (1), в df1 имя столбца — query, а в df2 — query_df2 и так далее.

FLEXIBLE_CONDITION не должно быть жестко запрограммировано, но может быть входным и может часто меняться. Или может быть автоматизировано на основе набора входных данных (например, имен столбцов).

Ответ №1:

Я понял это. Это то, что я искал:

  val first :  String = unique_attrs(0)
 var expression : org.apache.spark.sql.Column = df1(first) === df2_r(s"$first"   "_df2")
 for (i <- 1 to unique_attrs.length - 1) {
   val attr : String = unique_attrs(1)
   expression = expression amp;amp; df1(attr) === df2_r(s"$attr"   "_df2")
 }

 val df3 = df1.join(df2_r, expression, "fullouter")
  

Список атрибутов предоставляется в качестве входных данных (unique_attrs) для метода.

Ответ №2:

Вы можете указать выражение, которое следует использовать в join

подпись для этого

 def join(right: Dataset[_], joinExprs: Column): DataFrame
  

Например,

 val df1 = Seq(
    ("a1", "b1"),
    ("a2", "b2")
).toDF("a", "b")

val df2 = Seq(
    ("b1", "a1"),
    ("b2", "a2")
).toDF("b1", "a1")

df1.show
df2.show
  

вывод

  --- --- 
|  a|  b|
 --- --- 
| a1| b1|
| a2| b2|
 --- --- 

 --- --- 
| b1| a1|
 --- --- 
| b1| a1|
| b2| a2|
 --- --- 
  

вы можете создать любое выражение, которое хотите, и предоставить его для соединения

 val expression = df1("a") === df2("a1")
val result = df1 join (df2, expression)

result.show
  

вывод

  --- --- --- --- 
|  a|  b| b1| a1|
 --- --- --- --- 
| a1| b1| b1| a1|
| a2| b2| b2| a2|
 --- --- --- --- 
  

UPD:

Вы можете использовать createOrReplaceTempView , например

 df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

val res = spark.sql("select * from df1 inner join df2 on df1.a == df2.a1")
res.show
  

вывод

  --- --- --- --- 
|  a|  b| b1| a1|
 --- --- --- --- 
| a1| b1| b1| a1|
| a2| b2| b2| a2|
 --- --- --- --- 
  

Результат будет тем же, и вы можете предоставить sql-запрос в виде строки

Комментарии:

1. Спасибо за ваш комментарий. Использование выражения здесь снова требует жесткого кодирования условия. Условием может быть ввод, например, строка или любое другое предложение?

2. @Xan я обновил свое решение, возможно, это то, что вам было нужно