#sql #join #pyspark
#sql #Присоединиться #pyspark
Вопрос:
Предположим, у меня есть две таблицы A и B. Пусть их структура будет примерно такой:
A
:
------------------------------
col_1 | col_2 | col_3 | col_4
------------------------------
1 | A | a | i
2 | B | b | ii
3 | C | c | iii
4 | D | d | iv
5 | E | e | v
------------------------------
B
:
---------------
col_1 | col_3
---------------
1 | null
3 | c
null| b
2 | null
--------------
Уверен, что имена столбцов в B совпадают с именами в A, и я хочу объединить их, используя OR
условие среди различных столбцов. Единственная загвоздка в том, что количество столбцов в B неизвестно.
Как я могу выполнить объединение?
Псевдокод того, что я хочу сделать, будет выглядеть так:
select *
from A
join B
on A.col_1 == B.col_1
OR A.col_2 == B.col_2
......
OR A.col_k == B.col_k --where k is the total number of columns in B
Я создал следующую строку для spark.sql
, но я искал более Pyspark-ic
способ сделать это :
sql_query = 'select s.* from dfA s join dfB on '
#join using or conditions
for i in dfB.columns:
sql_query = 'dfA.' i ' == dfB.' i ' OR '
#remove the last extra 'OR'
sql_query = sql_query[:-3]
spark.sql(sql_query)
Описанный выше подход требует создания временных представлений, чтобы к ним можно было получить доступ в sqlContext
.
Комментарии:
1. Как может быть неизвестно количество столбцов в таблице? В любом случае SQL, как правило, не имеет синтаксиса для переменного количества столбцов в
join
условии.2. B автоматически генерируется с использованием некоторой логики. Я хочу выполнить итерацию по всем столбцам в B, если это возможно. Вышеуказанное было реализовано в Pyspark.
3. Я имел в виду, что количество столбцов в B может быть определено только во время выполнения.
Ответ №1:
Dataframe.columns возвращает список столбцов фрейма данных. С помощью этого свойства мы можем получить столбцы, которые являются общими для обоих фреймов данных:
dfA = ...
dfB = ...
#get the common columns
common_cols = [col for col in dfA.columns if col in dfB.columns]
#create a list of join conditions
join_conds = [dfA[col].eqNullSafe(dfB[col]) for col in common_cols]
#combine all join conditions with "or"
cond = join_conds[0]
for c in join_conds[1:]:
cond = cond | c
#use the combined condition in a join
dfA.join(dfB, cond).show()
Комментарии:
1. @ghost . . . Подход с использованием фреймов данных, вероятно, является лучшим решением.
2. @werner, я добавил подход, которому следовал. Это именно тот ответ, который я искал.