есть ли более простой способ объединить 100 фреймов данных PySpark с разными столбцами вместе (не объединить, а добавить)

#pyspark #pyspark-dataframes

#pyspark

Вопрос:

предположим, у меня много фреймов данных с аналогичной структурой, но с разными столбцами. Я хочу объединить их все вместе, как сделать это более простым способом?

например, df1, df2, df3 следующие:

df1

    id   base1 base2 col1 col2 col3 col4
   1    1     100   30    1    2    3
   2    2     200   40    2    3    4
   3    3     300   20    4    4    5
  

df2

    id   base1 base2 col1
   5    4     100   15
   6    1     99    18
   7    2     89    9
  

df3

    id   base1 base2 col1 col2
   9    2     77    12    3
   10   1     89    16    5
   11   2     88    10    7
  

чтобы быть:

    id   base1 base2 col1 col2 col3 col4
   1    1     100   30    1    2    3
   2    2     200   40    2    3    4
   3    3     300   20    4    4    5
   5    4     100   15   NaN  NaN  NaN 
   6    1     99    18   NaN  NaN  NaN 
   7    2     89    9    NaN  NaN  NaN 
   9    2     77    12    3   NaN  NaN
   10   1     89    16    5   NaN  NaN
   11   2     88    10    7   NaN  NaN
  

в настоящее время я использую этот код:

 from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1   list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

df_comb1=customUnion(df1,df2)
df_comb2=customUnion(df_comb1,df3)

  

однако, если я продолжу создавать новые фреймы данных, такие как df4, df5 и т. Д. (100 )
мой код становится беспорядочным.

есть ли способ закодировать его более простым способом?

Заранее спасибо

Ответ №1:

Вы можете управлять этим с помощью списка фреймов данных и функции, не обязательно статически называть каждый фрейм данных…

 dataframes = [df1,df2,df3] # load data frames
  

Вычислите набор всех возможных столбцов:

 all_cols = {i for lst in [df.columns for df in dataframes] for i in lst}
#{'base1', 'base2', 'col1', 'col2', 'col3', 'col4', 'id'}
  

Функция для добавления недостающих столбцов в DF:

 def add_missing_cols(df, cols):
    v = df
    for col in [c for c in cols if (not c in df.columns)]:
        v = v.withColumn(col, f.lit(None))
    return v

completed_dfs = [add_missing_cols(df, all_cols) for df in dataframes]

res = completed_dfs[0]
for df in completed_dfs[1:]:
    res = res.unionAll(df)

res.show()
  
  --- ----- ----- ---- ---- ---- ---- 
| id|base1|base2|col1|col2|col3|col4|
 --- ----- ----- ---- ---- ---- ---- 
|  1|    1|  100|  30|   1|   2|   3|
|  2|    2|  200|  40|   2|   3|   4|
|  3|    3|  300|  20|   4|   4|   5|
|  5|    4|  100|  15|null|null|null|
|  6|    1|   99|  18|null|null|null|
|  7|    2|   89|   9|null|null|null|
|  9|    2|   77|  12|   3|null|null|
| 10|    1|   89|  16|   5|null|null|
| 11|    2|   88|  10|   7|null|null|
 --- ----- ----- ---- ---- ---- ---- 
  

Комментарии:

1. большое спасибо, это именно то, что я ищу. просто протестируйте это, изменив код фреймов данных, он работает идеально!

2. Я выяснил, какие столбцы необходимо отсортировать. в противном случае это вызовет некоторые проблемы. поэтому в конце концов я возвращаюсь к своему старому UDF, используя цикл for