#sql #apache-spark #pyspark #apache-spark-sql
#sql #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных :
A, B, C, D, 201701, 2020001
A, B, C, D, 201801, 2020002
A, B, C, D, 201901, 2020003
ожидаемый результат :
col_A, col_B, col_C ,col_D, min_week ,max_week, min_month, max_month
A, B, C, D, 201701, 201901, 2020001, 2020003
Что я пробовал в pyspark-
from pyspark.sql import Window
import pyspark.sql.functions as psf
w1 = Window.partitionBy('A','B', 'C', 'D')
.orderBy('WEEK','MONTH')
df_new = df_source
.withColumn("min_week", psf.first("WEEK").over(w1))
.withColumn("max_week", psf.last("WEEK").over(w1))
.withColumn("min_month", psf.first("MONTH").over(w1))
.withColumn("max_month", psf.last("MONTH").over(w1))
Что я также пробовал —
sql_1 = """
select A, B , C, D, first(WEEK) as min_week,
last(WEEK) as max_week , first(MONTH) as min_month,
last(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)
Используя первый и второй подходы, я получил непротиворечивые результаты.
Будет ли приведенный ниже подход работать для устранения проблемы, с которой столкнулись выше —
sql_1 = """
select A, B , C, D, min(WEEK) as min_week,
max(WEEK) as max_week , min(MONTH) as min_month,
max(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)
Какой подход идеально работает в pyspark каждый раз?
есть ли какой-либо альтернативный способ
или, третий вариант — лучший способ справиться с этим требованием.
Любые указатели будут полезны.
Комментарии:
1. Что не так в выходных данных, которые вы получаете для последнего запроса?
2. я протестирую и вернусь, является ли этот подход наилучшим подходом
Ответ №1:
Третий подход, который вы предлагаете, будет работать каждый раз. Вы также можете написать это так:
df
.groupBy('A', 'B', 'C', 'D')
.agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
.show()
что дает:
--- --- --- --- -------- -------- --------- ---------
| A| B| C| D|min_week|max_week|min_month|max_month|
--- --- --- --- -------- -------- --------- ---------
| A| B| C| D| 201701| 201901| 2020001| 2020003|
--- --- --- --- -------- -------- --------- ---------
Интересно понять, почему первые два подхода дают непредсказуемые результаты, в то время как третий всегда работает.
Второй подход непредсказуем, потому что spark — это механизм параллельных вычислений. Когда он агрегирует значение, он начинается с агрегирования значения во всех разделах, а затем результаты будут агрегированы два на два. Тем не менее, порядок этих агрегатов не является детерминированным. Это зависит, среди прочего, от порядка выполнения задач, который может меняться при каждой попытке, в частности, если данных много.
Первый подход — это не совсем то, что вы хотите сделать. Оконные функции не будут объединять фрейм данных в одну строку. Они вычислят агрегацию и добавят ее в каждую строку. Вы также допускаете несколько ошибок. Если вы заказываете фрейм данных, по умолчанию spark рассматривает окна в диапазоне от начала окна до текущей строки. Поэтому максимальной будет текущая строка за неделю. Фактически, для вычисления in и max вам не нужно заказывать dataframe. Вы можете просто сделать это так:
w = Window.partitionBy('A','B', 'C', 'D')
df.select('A', 'B', 'C', 'D',
F.min('WEEK').over(w).alias('min_week'),
F.max('WEEK').over(w).alias('max_week'),
F.min('MONTH').over(w).alias('min_month'),
F.max('MONTH').over(w).alias('max_month')
).show()
что дает правильный результат, но это было не то, что вы ожидали. Но, по крайней мере, вы видите разницу между агрегациями окон и обычными агрегациями.
--- --- --- --- -------- -------- --------- ---------
| A| B| C| D|min_week|max_week|min_month|max_month|
--- --- --- --- -------- -------- --------- ---------
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
| A| B| C| D| 201701| 201901| 2020001| 2020003|
--- --- --- --- -------- -------- --------- ---------