#scala #apache-spark #join
#scala #apache-spark #Присоединиться
Вопрос:
Я хочу иметь гибкое условие соединения, которое можно передать, например, в виде строки (или любого другого предложения?). Например, в следующей инструкции выражение FLEXIBLE_CONDITION может меняться при разных запусках.
val df3 = df1.join(df2, FLEXIBLE_CONDITION, "fullouter")
Несколько примеров:
(1) df1(s"query") === df2 (s"query_df2")
(2) df1(s"id") === df2(s"id_df2") amp;amp; df1(s"item") === df2(s"item_df2")
(3) Or combination of (1) and (2) or any other condition
Необходимо отметить, что имена столбцов, на основе которых будет выполняться объединение, отличаются. Например, в (1), в df1 имя столбца — query, а в df2 — query_df2 и так далее.
FLEXIBLE_CONDITION не должно быть жестко запрограммировано, но может быть входным и может часто меняться. Или может быть автоматизировано на основе набора входных данных (например, имен столбцов).
Ответ №1:
Я понял это. Это то, что я искал:
val first : String = unique_attrs(0)
var expression : org.apache.spark.sql.Column = df1(first) === df2_r(s"$first" "_df2")
for (i <- 1 to unique_attrs.length - 1) {
val attr : String = unique_attrs(1)
expression = expression amp;amp; df1(attr) === df2_r(s"$attr" "_df2")
}
val df3 = df1.join(df2_r, expression, "fullouter")
Список атрибутов предоставляется в качестве входных данных (unique_attrs) для метода.
Ответ №2:
Вы можете указать выражение, которое следует использовать в join
подпись для этого
def join(right: Dataset[_], joinExprs: Column): DataFrame
Например,
val df1 = Seq(
("a1", "b1"),
("a2", "b2")
).toDF("a", "b")
val df2 = Seq(
("b1", "a1"),
("b2", "a2")
).toDF("b1", "a1")
df1.show
df2.show
вывод
--- ---
| a| b|
--- ---
| a1| b1|
| a2| b2|
--- ---
--- ---
| b1| a1|
--- ---
| b1| a1|
| b2| a2|
--- ---
вы можете создать любое выражение, которое хотите, и предоставить его для соединения
val expression = df1("a") === df2("a1")
val result = df1 join (df2, expression)
result.show
вывод
--- --- --- ---
| a| b| b1| a1|
--- --- --- ---
| a1| b1| b1| a1|
| a2| b2| b2| a2|
--- --- --- ---
UPD:
Вы можете использовать createOrReplaceTempView
, например
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
val res = spark.sql("select * from df1 inner join df2 on df1.a == df2.a1")
res.show
вывод
--- --- --- ---
| a| b| b1| a1|
--- --- --- ---
| a1| b1| b1| a1|
| a2| b2| b2| a2|
--- --- --- ---
Результат будет тем же, и вы можете предоставить sql-запрос в виде строки
Комментарии:
1. Спасибо за ваш комментарий. Использование выражения здесь снова требует жесткого кодирования условия. Условием может быть ввод, например, строка или любое другое предложение?
2. @Xan я обновил свое решение, возможно, это то, что вам было нужно