Получить статистическую информацию (похожую на описание) для групп лет в столбце

#pandas #apache-spark #pyspark #apache-spark-sql

#pandas #apache-spark #pyspark #apache-spark-sql

Вопрос:

из столбца, подобного дате (году), я получаю значения такого типа:

  -------------------- ------------ 
|         CUSTOMER_ID|yearSelected|
 -------------------- ------------ 
|1                   |        2010|
|2                   |        1992|
|3                   |        1996|
|4                   |        1990|
|5                   |        1984|
 -------------------- ------------ 
 

Теперь мне нужно группировать их каждые пять лет (с 1990 по 1994 год: группа 1, с 1995 по 1996 год: группа 2) и так далее, и для каждой группы получать информацию, как если бы я выполнял df.describe()

Что я пробовал до сих пор:

 df8 = df4.groupBy('yearSelected')
stat_col = conf['by']

output = df8.agg(
    F.count(stat_col).alias("count"),
    F.mean(stat_col).alias("mean"),
    F.min(stat_col).cast(DecimalType(36,2)).alias("min"),
    F.max(stat_col).cast(DecimalType(36,2)).alias("max"),
    F.sum((F.col(stat_col) > 0).cast(DecimalType(36,2))).alias("greaterThan0"),
    F.sum((F.col(stat_col) == 0).cast(DoubleType())).alias("equalTo0"),
    F.sum((F.col(stat_col) < 0).cast(DecimalType(36,2))).alias("lesserThan0"),
).toPandas()
 

Чего не хватает, так это группировки по диапазону лет, которую я еще не решил.

Другая идея заключалась в использовании Windows, но я терплю неудачу:

 windowSpecAgg  = Window.partitionBy("yearSelected").orderBy("yearSelected")
df5 = df4.withColumn("row",row_number().over(windowSpecAgg)) 
  .withColumn("avg", avg(col(conf['by'])).over(windowSpecAgg)) 
  .withColumn("sum", sum(col(conf['by'])).over(windowSpecAgg)) 
  .withColumn("min", min(col(conf['by'])).over(windowSpecAgg)) 
  .withColumn("max", max(col(conf['by'])).over(windowSpecAgg)) 
  .where(col("row")==1).select("yearSelected","avg","sum","min","max")
 

Есть две причины, по которым я терплю неудачу: во-первых, я не могу должным образом завершить эту идею и поместить ее в код, а во-вторых, каждое действие, которое я пытаюсь выполнить (show (), describe () , count ()) для ЭТОГО фрейма данных, а не для похожих, я получаю:

     py4j.protocol.Py4JJavaError: An error occurred while calling o2323.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 47.0 failed 1 times, most recent failure: Lost task 8.0 in stage 47.0 (TID 692, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

...
TypeError: strptime() argument 1 must be str, not None

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:81)
 

Вот как я создаю фрейм данных, если это вообще полезно (как я уже сказал, я делаю этот предыдущий шаг, чтобы получить ГОД из столбца, определенного в conf[by] :

 funcDateTransf =  udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df1 = df.withColumn('dateFormat', date_format(funcDateTransf(col(conf['by'])), 'MM-dd-yyy'))
df1 = df1.withColumn('date_in_dateFormat', 
               to_date(unix_timestamp(col('dateFormat'), 'MM-dd-yyyy').cast("timestamp")))
df3 = df1.select(conf['columnaJoin'],year('date_in_dateFormat').alias('yearSelected'))
#df3.show(5)

df4 = df1.join(df3, df1[conf['columnaJoin']] == df3[conf['columnaJoin']], 'inner')
 

Я не понимаю, почему этот конкретный способ создания фрейма данных доставил бы мне проблемы, а не тогда, когда я не создаю этот дополнительный столбец, но это то, что я наблюдаю прямо сейчас.

Комментарии:

1. udf funcDateTransf выдает ошибку Python TypeError: strptime() argument 1 must be str, not None . Попробуйте заменить udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType()) на udf(lambda x: datetime.strptime(x, '%Y%m%d') if x is not None else None, DateType()) и проверьте, осталась ли какая-либо ошибка

2. Я немного смущен тем, что вы пытаетесь спросить (извиняюсь, это, вероятно, моя вина). Вы пытаетесь сгруппировать отдельные годы в соответствующий пятилетний диапазон или затрудняетесь с вычислением статистики, аналогичной тем, которые используются describe() в Pandas?

Ответ №1:

Из того, что я понял из вашего вопроса, вы затрудняетесь с тем, как сгруппировать отдельные годы в 5-летние диапазоны. Давайте представим, что вызывается ваш набор dataDF данных. Теперь, чтобы сгруппировать ваши отдельные годы по группам по 5 лет, вы можете использовать:

 from pyspark.sql import functions as F

dataDF.show()
 -------------------- ------------ 
|         CUSTOMER_ID|yearSelected|
 -------------------- ------------ 
|1                   |        2010|
|2                   |        1992|
|3                   |        1996|
|4                   |        1990|
|5                   |        1984|
 -------------------- ------------ 

dataDF.withColumn("new_column",
       F.when((F.col("yearSelected") >= 1980) amp; (F.col("yearSelected") <= 1984), "Group 1"), 
      .when((F.col("yearSelected") >= 1985) amp; (F.col("yearSelected") <= 1989), "Group 2"), 
      .when((F.col("yearSelected") >= 1990) amp; (F.col("yearSelected") <= 1994), "Group 3"), 
      .when((F.col("yearSelected") >= 1995) amp; (F.col("yearSelected") <= 1999), "Group 4"), 
      .when((F.col("yearSelected") >= 2000) amp; (F.col("yearSelected") <= 2004), "Group 5"), 
      .when((F.col("yearSelected") >= 2005) amp; (F.col("yearSelected") <= 2009), "Group 6"), 
      .when((F.col("yearSelected") >= 2010) amp; (F.col("yearSelected") <= 2014), "Group 7"), 
      .otherwise("N/A")).show()
 

Результирующий набор данных будет:

  -------------------- ------------ ------------- 
|         CUSTOMER_ID|yearSelected| new_column  |
 -------------------- ------------ ------------- 
|1                   |        2010|   Group 7   |
|2                   |        1992|   Group 3   |
|3                   |        1996|   Group 4   |
|4                   |        1990|   Group 3   |
|5                   |        1984|   Group 1   |
 -------------------- ------------ -------------              
 

Отсюда вы можете сгруппировать данные new_column и получить статистику, как вы упомянули.

Недостатком этого будет то, что вам придется вручную настраивать, какие диапазоны вы хотите выбрать. Однако, исходя из вашего примера, я предполагаю, что вы не уходите слишком далеко назад во времени, так что это должно сработать 🙂

Комментарии:

1. Большое вам спасибо! Это очень помогает. Есть ли способ сделать создание групп new_column более автоматическим? Я полагаю, что диапазоны от 1900 до 2100, так что это не будет огромным, но немного большим. Я думал использовать некоторую структуру, но синтаксис пока не работает