#pyspark #pyspark-dataframes
#pyspark
Вопрос:
предположим, у меня много фреймов данных с аналогичной структурой, но с разными столбцами. Я хочу объединить их все вместе, как сделать это более простым способом?
например, df1, df2, df3 следующие:
df1
id base1 base2 col1 col2 col3 col4
1 1 100 30 1 2 3
2 2 200 40 2 3 4
3 3 300 20 4 4 5
df2
id base1 base2 col1
5 4 100 15
6 1 99 18
7 2 89 9
df3
id base1 base2 col1 col2
9 2 77 12 3
10 1 89 16 5
11 2 88 10 7
чтобы быть:
id base1 base2 col1 col2 col3 col4
1 1 100 30 1 2 3
2 2 200 40 2 3 4
3 3 300 20 4 4 5
5 4 100 15 NaN NaN NaN
6 1 99 18 NaN NaN NaN
7 2 89 9 NaN NaN NaN
9 2 77 12 3 NaN NaN
10 1 89 16 5 NaN NaN
11 2 88 10 7 NaN NaN
в настоящее время я использую этот код:
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row
def customUnion(df1, df2):
cols1 = df1.columns
cols2 = df2.columns
total_cols = sorted(cols1 list(set(cols2) - set(cols1)))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
return appended
df_comb1=customUnion(df1,df2)
df_comb2=customUnion(df_comb1,df3)
однако, если я продолжу создавать новые фреймы данных, такие как df4, df5 и т. Д. (100 )
мой код становится беспорядочным.
есть ли способ закодировать его более простым способом?
Заранее спасибо
Ответ №1:
Вы можете управлять этим с помощью списка фреймов данных и функции, не обязательно статически называть каждый фрейм данных…
dataframes = [df1,df2,df3] # load data frames
Вычислите набор всех возможных столбцов:
all_cols = {i for lst in [df.columns for df in dataframes] for i in lst}
#{'base1', 'base2', 'col1', 'col2', 'col3', 'col4', 'id'}
Функция для добавления недостающих столбцов в DF:
def add_missing_cols(df, cols):
v = df
for col in [c for c in cols if (not c in df.columns)]:
v = v.withColumn(col, f.lit(None))
return v
completed_dfs = [add_missing_cols(df, all_cols) for df in dataframes]
res = completed_dfs[0]
for df in completed_dfs[1:]:
res = res.unionAll(df)
res.show()
--- ----- ----- ---- ---- ---- ----
| id|base1|base2|col1|col2|col3|col4|
--- ----- ----- ---- ---- ---- ----
| 1| 1| 100| 30| 1| 2| 3|
| 2| 2| 200| 40| 2| 3| 4|
| 3| 3| 300| 20| 4| 4| 5|
| 5| 4| 100| 15|null|null|null|
| 6| 1| 99| 18|null|null|null|
| 7| 2| 89| 9|null|null|null|
| 9| 2| 77| 12| 3|null|null|
| 10| 1| 89| 16| 5|null|null|
| 11| 2| 88| 10| 7|null|null|
--- ----- ----- ---- ---- ---- ----
Комментарии:
1. большое спасибо, это именно то, что я ищу. просто протестируйте это, изменив код фреймов данных, он работает идеально!
2. Я выяснил, какие столбцы необходимо отсортировать. в противном случае это вызовет некоторые проблемы. поэтому в конце концов я возвращаюсь к своему старому UDF, используя цикл for