#pandas #apache-spark #pyspark #apache-spark-sql
#pandas #apache-spark #pyspark #apache-spark-sql
Вопрос:
из столбца, подобного дате (году), я получаю значения такого типа:
-------------------- ------------
| CUSTOMER_ID|yearSelected|
-------------------- ------------
|1 | 2010|
|2 | 1992|
|3 | 1996|
|4 | 1990|
|5 | 1984|
-------------------- ------------
Теперь мне нужно группировать их каждые пять лет (с 1990 по 1994 год: группа 1, с 1995 по 1996 год: группа 2) и так далее, и для каждой группы получать информацию, как если бы я выполнял df.describe()
Что я пробовал до сих пор:
df8 = df4.groupBy('yearSelected')
stat_col = conf['by']
output = df8.agg(
F.count(stat_col).alias("count"),
F.mean(stat_col).alias("mean"),
F.min(stat_col).cast(DecimalType(36,2)).alias("min"),
F.max(stat_col).cast(DecimalType(36,2)).alias("max"),
F.sum((F.col(stat_col) > 0).cast(DecimalType(36,2))).alias("greaterThan0"),
F.sum((F.col(stat_col) == 0).cast(DoubleType())).alias("equalTo0"),
F.sum((F.col(stat_col) < 0).cast(DecimalType(36,2))).alias("lesserThan0"),
).toPandas()
Чего не хватает, так это группировки по диапазону лет, которую я еще не решил.
Другая идея заключалась в использовании Windows, но я терплю неудачу:
windowSpecAgg = Window.partitionBy("yearSelected").orderBy("yearSelected")
df5 = df4.withColumn("row",row_number().over(windowSpecAgg))
.withColumn("avg", avg(col(conf['by'])).over(windowSpecAgg))
.withColumn("sum", sum(col(conf['by'])).over(windowSpecAgg))
.withColumn("min", min(col(conf['by'])).over(windowSpecAgg))
.withColumn("max", max(col(conf['by'])).over(windowSpecAgg))
.where(col("row")==1).select("yearSelected","avg","sum","min","max")
Есть две причины, по которым я терплю неудачу: во-первых, я не могу должным образом завершить эту идею и поместить ее в код, а во-вторых, каждое действие, которое я пытаюсь выполнить (show (), describe () , count ()) для ЭТОГО фрейма данных, а не для похожих, я получаю:
py4j.protocol.Py4JJavaError: An error occurred while calling o2323.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 47.0 failed 1 times, most recent failure: Lost task 8.0 in stage 47.0 (TID 692, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
...
TypeError: strptime() argument 1 must be str, not None
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$anon$1.read(PythonUDFRunner.scala:81)
Вот как я создаю фрейм данных, если это вообще полезно (как я уже сказал, я делаю этот предыдущий шаг, чтобы получить ГОД из столбца, определенного в conf[by]
:
funcDateTransf = udf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
df1 = df.withColumn('dateFormat', date_format(funcDateTransf(col(conf['by'])), 'MM-dd-yyy'))
df1 = df1.withColumn('date_in_dateFormat',
to_date(unix_timestamp(col('dateFormat'), 'MM-dd-yyyy').cast("timestamp")))
df3 = df1.select(conf['columnaJoin'],year('date_in_dateFormat').alias('yearSelected'))
#df3.show(5)
df4 = df1.join(df3, df1[conf['columnaJoin']] == df3[conf['columnaJoin']], 'inner')
Я не понимаю, почему этот конкретный способ создания фрейма данных доставил бы мне проблемы, а не тогда, когда я не создаю этот дополнительный столбец, но это то, что я наблюдаю прямо сейчас.
Комментарии:
1. udf
funcDateTransf
выдает ошибку PythonTypeError: strptime() argument 1 must be str, not None
. Попробуйте заменитьudf(lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
наudf(lambda x: datetime.strptime(x, '%Y%m%d') if x is not None else None, DateType())
и проверьте, осталась ли какая-либо ошибка2. Я немного смущен тем, что вы пытаетесь спросить (извиняюсь, это, вероятно, моя вина). Вы пытаетесь сгруппировать отдельные годы в соответствующий пятилетний диапазон или затрудняетесь с вычислением статистики, аналогичной тем, которые используются
describe()
в Pandas?
Ответ №1:
Из того, что я понял из вашего вопроса, вы затрудняетесь с тем, как сгруппировать отдельные годы в 5-летние диапазоны. Давайте представим, что вызывается ваш набор dataDF
данных. Теперь, чтобы сгруппировать ваши отдельные годы по группам по 5 лет, вы можете использовать:
from pyspark.sql import functions as F
dataDF.show()
-------------------- ------------
| CUSTOMER_ID|yearSelected|
-------------------- ------------
|1 | 2010|
|2 | 1992|
|3 | 1996|
|4 | 1990|
|5 | 1984|
-------------------- ------------
dataDF.withColumn("new_column",
F.when((F.col("yearSelected") >= 1980) amp; (F.col("yearSelected") <= 1984), "Group 1"),
.when((F.col("yearSelected") >= 1985) amp; (F.col("yearSelected") <= 1989), "Group 2"),
.when((F.col("yearSelected") >= 1990) amp; (F.col("yearSelected") <= 1994), "Group 3"),
.when((F.col("yearSelected") >= 1995) amp; (F.col("yearSelected") <= 1999), "Group 4"),
.when((F.col("yearSelected") >= 2000) amp; (F.col("yearSelected") <= 2004), "Group 5"),
.when((F.col("yearSelected") >= 2005) amp; (F.col("yearSelected") <= 2009), "Group 6"),
.when((F.col("yearSelected") >= 2010) amp; (F.col("yearSelected") <= 2014), "Group 7"),
.otherwise("N/A")).show()
Результирующий набор данных будет:
-------------------- ------------ -------------
| CUSTOMER_ID|yearSelected| new_column |
-------------------- ------------ -------------
|1 | 2010| Group 7 |
|2 | 1992| Group 3 |
|3 | 1996| Group 4 |
|4 | 1990| Group 3 |
|5 | 1984| Group 1 |
-------------------- ------------ -------------
Отсюда вы можете сгруппировать данные new_column
и получить статистику, как вы упомянули.
Недостатком этого будет то, что вам придется вручную настраивать, какие диапазоны вы хотите выбрать. Однако, исходя из вашего примера, я предполагаю, что вы не уходите слишком далеко назад во времени, так что это должно сработать 🙂
Комментарии:
1. Большое вам спасибо! Это очень помогает. Есть ли способ сделать создание групп new_column более автоматическим? Я полагаю, что диапазоны от 1900 до 2100, так что это не будет огромным, но немного большим. Я думал использовать некоторую структуру, но синтаксис пока не работает