Группируйте записи в фрейме данных и отображайте все столбцы с помощью PySpark

#python #dataframe #apache-spark #pyspark #group-by

Вопрос:

У меня есть следующий фрейм данных

 dataframe - columnA, columnB, columnC, columnD, columnE
 

Я хочу сгруппировать ColumnC, а затем рассмотреть максимальное значение ColumnE

 dataframe .select('*').groupBy('columnC').max('columnE')
 

ожидаемый результат

 dataframe - columnA, columnB, columnC, columnD, columnE
 

Реальный объем производства

 dataframe - columnC, columnE
 

Почему все столбцы во фрейме данных отображаются не так, как ожидалось ?

Комментарии:

1. Почему вы ожидаете, что будут отображаться все столбцы, если вы агрегировали данные только по одному столбцу в каждой группе?

2. @It_is_Chris, потому что, насколько я понимаю, функции groupBy и aggregate max() ведут себя так же, как MYSQL. Я хочу сгруппировать данные по столбцам, а затем из этого результата хочу получить строки с максимальным значением colunE

Ответ №1:

Для версии Spark >= 3.0.0 вы можете использовать >max_by для выбора дополнительных столбцов.

 import random
from pyspark.sql import functions as F

#create some testdata
df = spark.createDataFrame(
  [[random.randint(1,3)]   random.sample(range(0, 30), 4)  for _ in range(10)], 
  schema=["columnC", "columnB", "columnA", "columnD", "columnE"]) 
  .select("columnA", "columnB", "columnC", "columnD", "columnE")

df.groupBy("columnC") 
  .agg(F.max("columnE"), 
      F.expr("max_by(columnA, columnE) as columnA"),
      F.expr("max_by(columnB, columnE) as columnB"),
      F.expr("max_by(columnD, columnE) as columnD")) 
  .show()
 

Для тестовых данных

  ------- ------- ------- ------- ------- 
|columnA|columnB|columnC|columnD|columnE|
 ------- ------- ------- ------- ------- 
|     25|     20|      2|      0|      2|
|     14|      2|      2|     24|      6|
|     26|     13|      3|      2|      1|
|      5|     24|      3|     19|     17|
|     22|      5|      3|     14|     21|
|     24|      5|      1|      8|      4|
|      7|     22|      3|     16|     20|
|      6|     17|      1|      5|      7|
|     24|     22|      2|      8|      3|
|      4|     14|      1|     16|     11|
 ------- ------- ------- ------- ------- 
 

в результате получается

  ------- ------------ ------- ------- ------- 
|columnC|max(columnE)|columnA|columnB|columnD|
 ------- ------------ ------- ------- ------- 
|      1|          11|      4|     14|     16|
|      3|          21|     22|      5|     14|
|      2|           6|     14|      2|     24|
 ------- ------------ ------- ------- ------- 
 

Комментарии:

1. Даже если columnA, ColumnB, ColumnD имеют строковые значения, этот способ можно использовать правильно?

Ответ №2:

То, чего вы хотите достичь, можно сделать с помощью функции ОКНА. Не групповая

  • разделите ваши данные по столбцам.
  • Упорядочивайте свои данные в каждом разделе в desc (ранг)
  • отфильтруйте желаемый результат.
 from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import col

windowSpec  = Window.partitionBy("columnC").orderBy(col("columnE").desc())

expectedDf = df.withColumn("rank", rank().over(windowSpec)) 
    .filter(col("rank") == 1)
 

Возможно, вы захотите изменить свой вопрос.