Spark: требуется подтверждение подхода при захвате первой и последней даты: в наборе данных

#sql #apache-spark #pyspark #apache-spark-sql

#sql #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных :

 A, B, C, D, 201701, 2020001
A, B, C, D, 201801, 2020002
A, B, C, D, 201901, 2020003
  

ожидаемый результат :

 col_A, col_B, col_C ,col_D, min_week ,max_week, min_month, max_month
A,         B,     C,     D,    201701,  201901,  2020001,  2020003
  

Что я пробовал в pyspark-

 from pyspark.sql import Window
import pyspark.sql.functions as psf

w1 = Window.partitionBy('A','B', 'C', 'D')
.orderBy('WEEK','MONTH')
df_new = df_source
.withColumn("min_week", psf.first("WEEK").over(w1))
.withColumn("max_week", psf.last("WEEK").over(w1))
.withColumn("min_month", psf.first("MONTH").over(w1))
.withColumn("max_month", psf.last("MONTH").over(w1))
  

Что я также пробовал —

 sql_1 = """
select A, B , C, D, first(WEEK) as min_week, 
last(WEEK) as max_week , first(MONTH) as min_month, 
last(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)
  

Используя первый и второй подходы, я получил непротиворечивые результаты.
Будет ли приведенный ниже подход работать для устранения проблемы, с которой столкнулись выше —

 sql_1 = """
select A, B , C, D, min(WEEK) as min_week, 
max(WEEK) as max_week , min(MONTH) as min_month, 
max(MONTH) as max_month from df_source
group by A, B , C, D
order by A, B , C, D
"""
df_new = spark.sql(sql_1)
  

Какой подход идеально работает в pyspark каждый раз?
есть ли какой-либо альтернативный способ

или, третий вариант — лучший способ справиться с этим требованием.

Любые указатели будут полезны.

Комментарии:

1. Что не так в выходных данных, которые вы получаете для последнего запроса?

2. я протестирую и вернусь, является ли этот подход наилучшим подходом

Ответ №1:

Третий подход, который вы предлагаете, будет работать каждый раз. Вы также можете написать это так:

 df
    .groupBy('A', 'B', 'C', 'D')
    .agg(F.min('WEEK').alias('min_week'), F.max('WEEK').alias('max_week'),
         F.min('MONTH').alias('min_month'), F.max('MONTH').alias('max_month'))
    .show()
  

что дает:

  --- --- --- --- -------- -------- --------- --------- 
|  A|  B|  C|  D|min_week|max_week|min_month|max_month|
 --- --- --- --- -------- -------- --------- --------- 
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
 --- --- --- --- -------- -------- --------- --------- 
  

Интересно понять, почему первые два подхода дают непредсказуемые результаты, в то время как третий всегда работает.

Второй подход непредсказуем, потому что spark — это механизм параллельных вычислений. Когда он агрегирует значение, он начинается с агрегирования значения во всех разделах, а затем результаты будут агрегированы два на два. Тем не менее, порядок этих агрегатов не является детерминированным. Это зависит, среди прочего, от порядка выполнения задач, который может меняться при каждой попытке, в частности, если данных много.

Первый подход — это не совсем то, что вы хотите сделать. Оконные функции не будут объединять фрейм данных в одну строку. Они вычислят агрегацию и добавят ее в каждую строку. Вы также допускаете несколько ошибок. Если вы заказываете фрейм данных, по умолчанию spark рассматривает окна в диапазоне от начала окна до текущей строки. Поэтому максимальной будет текущая строка за неделю. Фактически, для вычисления in и max вам не нужно заказывать dataframe. Вы можете просто сделать это так:

 w = Window.partitionBy('A','B', 'C', 'D')
df.select('A', 'B', 'C', 'D',
    F.min('WEEK').over(w).alias('min_week'),
    F.max('WEEK').over(w).alias('max_week'),
    F.min('MONTH').over(w).alias('min_month'),
    F.max('MONTH').over(w).alias('max_month')
).show()
  

что дает правильный результат, но это было не то, что вы ожидали. Но, по крайней мере, вы видите разницу между агрегациями окон и обычными агрегациями.

  --- --- --- --- -------- -------- --------- --------- 
|  A|  B|  C|  D|min_week|max_week|min_month|max_month|
 --- --- --- --- -------- -------- --------- --------- 
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
|  A|  B|  C|  D|  201701|  201901|  2020001|  2020003|
 --- --- --- --- -------- -------- --------- ---------