#apache-spark #pyspark #group-by
Вопрос:
У меня есть простая операция, Pyspark
но мне нужно выполнить операцию с множеством различных параметров. Это просто фильтр по одному столбцу, затем группировка по другому столбцу и агрегирование по третьему столбцу. В Python
, функция является:
def filter_gby_reduce(df, filter_col = None, filter_value = None):
return df.filter(col(filter_col) == filter_value).groupby('ID').agg(max('Value'))
Предположим, что различные конфигурации
func_params = spark.createDataFrame([('Day', 'Monday'), ('Month', 'January')], ['feature', 'filter_value'])
Я мог бы, конечно, просто запускать функции одну за другой:
filter_gby_reduce(df, filter_col = 'Day', filter_value = 'Monday')
filter_gby_reduce(df, filter_col = 'Month', filter_value = 'January')
Но моя фактическая коллекция параметров намного больше. Наконец, мне также нужно, чтобы union
все результаты функции были объединены в один фрейм данных. Итак, есть ли в spark способ написать это более кратко и таким образом, чтобы в полной мере использовать преимущества распараллеливания?
Ответ №1:
Один из способов сделать это-создать нужные значения в виде столбцов, используя when
и max
и передавая их agg
. Если вы хотите, чтобы значения были объединены, вы должны отменить вывод результата с помощью stack
(для этого не используется API фреймов данных, поэтому selectExpr
используется a). В зависимости от набора данных, который вы можете получить null
, если фильтр исключает все данные, при необходимости их можно удалить.
Я бы рекомендовал протестировать это против «наивного» подхода, заключающегося в простом объединении большого количества отфильтрованных кадров данных.
import pyspark.sql.functions as f
func_params = [('Day', 'Monday'), ('Month', 'January')]
df = spark.createDataFrame([
('Monday', 'June', 1, 5),
('Monday', 'January', 1, 2),
('Monday', 'June', 1, 5),
('Monday', 'June', 2, 10)],
['Day', 'Month', 'ID', 'Value'])
cols = []
for column, flt in func_params:
name = f'{column}_{flt}'
val = f.when(f.col(column) == flt, f.col('Value')).otherwise(None)
cols.append(f.max(val).alias(name))
stack = f"stack({len(cols)}," ','.join(f"'{column}_{flt}', {column}_{flt}" for column, flt in func_params) ')'
(df
.groupby('ID')
.agg(*cols)
.selectExpr('ID', stack)
.withColumnRenamed('col0', 'param')
.withColumnRenamed('col1', 'Value')
.show()
)
--- ------------- -----
| ID| param|Value|
--- ------------- -----
| 1| Day_Monday| 5|
| 1|Month_January| 2|
| 2| Day_Monday| 10|
| 2|Month_January| null|
--- ------------- -----