Pyspark — фильтр, группировка, агрегат для различных комбинаций столбцов и функций

#apache-spark #pyspark #group-by

Вопрос:

У меня есть простая операция, Pyspark но мне нужно выполнить операцию с множеством различных параметров. Это просто фильтр по одному столбцу, затем группировка по другому столбцу и агрегирование по третьему столбцу. В Python , функция является:

 def filter_gby_reduce(df, filter_col = None, filter_value = None):
  return df.filter(col(filter_col) == filter_value).groupby('ID').agg(max('Value'))
 

Предположим, что различные конфигурации

 func_params = spark.createDataFrame([('Day', 'Monday'), ('Month', 'January')], ['feature', 'filter_value'])
 

Я мог бы, конечно, просто запускать функции одну за другой:

 filter_gby_reduce(df, filter_col = 'Day', filter_value = 'Monday')
filter_gby_reduce(df, filter_col = 'Month', filter_value = 'January')
 

Но моя фактическая коллекция параметров намного больше. Наконец, мне также нужно, чтобы union все результаты функции были объединены в один фрейм данных. Итак, есть ли в spark способ написать это более кратко и таким образом, чтобы в полной мере использовать преимущества распараллеливания?

Ответ №1:

Один из способов сделать это-создать нужные значения в виде столбцов, используя when и max и передавая их agg . Если вы хотите, чтобы значения были объединены, вы должны отменить вывод результата с помощью stack (для этого не используется API фреймов данных, поэтому selectExpr используется a). В зависимости от набора данных, который вы можете получить null , если фильтр исключает все данные, при необходимости их можно удалить.

Я бы рекомендовал протестировать это против «наивного» подхода, заключающегося в простом объединении большого количества отфильтрованных кадров данных.

 import pyspark.sql.functions as f
func_params = [('Day', 'Monday'), ('Month', 'January')]

df = spark.createDataFrame([
    ('Monday', 'June', 1, 5), 
    ('Monday', 'January', 1, 2), 
    ('Monday', 'June', 1, 5),
    ('Monday', 'June', 2, 10)], 
    ['Day', 'Month', 'ID', 'Value'])


cols = []
for column, flt in func_params:
    name = f'{column}_{flt}'
    val = f.when(f.col(column) == flt, f.col('Value')).otherwise(None)
    cols.append(f.max(val).alias(name))

stack = f"stack({len(cols)},"   ','.join(f"'{column}_{flt}', {column}_{flt}" for column, flt in func_params)   ')'

(df
    .groupby('ID')
    .agg(*cols)
    .selectExpr('ID', stack)
    .withColumnRenamed('col0', 'param')
    .withColumnRenamed('col1', 'Value')
    .show()
)

 --- ------------- -----                                                        
| ID|        param|Value|
 --- ------------- ----- 
|  1|   Day_Monday|    5|
|  1|Month_January|    2|
|  2|   Day_Monday|   10|
|  2|Month_January| null|
 --- ------------- -----