Использование оконных функций в PySpark для расчета 12-месячных конечных процентилей (percent_rank)

#apache-spark #pyspark #apache-spark-sql

#apache-искра #пыспарк #apache-spark-sql

Вопрос:

Я всю ночь ломал голову над этой проблемой и надеялся получить какую-нибудь помощь.

У нас есть задача прогнозирования задержки рейса для школы, использующая большие, если не большие, данные (~32 млн строк). Я много занимался разработкой функций, потому что считал, что это потенциально очень ценный и хороший способ изучить Spark.

Функция, которую я хотел разработать, заключалась в процентиле времени оборота по маршруту. Например, предположим, что маршрут из Ньюарка (EWR) в О’Хара (ORD), а прибывающий рейс опаздывает и прибывает в 8:00 утра, а его запланированный вылет-8:35 утра. Время выполнения заказа составляет 35 минут. Это много или мало для этого маршрута?

Что я хотел сделать, так это рассчитать процентиль для этого маршрута, используя percent_rank . Но я также хотел использовать конечные 12 месяцев (и, что более важно, нет будущего периода времени, который не был бы доступен в то время). Именно это сочетание критериев сбивает меня с толку. Приведенные ниже данные являются репрезентативными, но имейте в виду, что я разделюсь на несколько маршрутов.

 | ROUTE | CRS_DEP_TIME_UTC | TURNAROUND_EST | |---------|------------------------------|----------------| | EWR-ORD | 2015-07-15T20:38:00.000 0000 | 57 | | EWR-ORD | 2015-07-30T00:00:00.000 0000 | 50 | | EWR-ORD | 2015-08-23T21:40:00.000 0000 | 56 | | EWR-ORD | 2016-02-09T21:29:00.000 0000 | 70 | | EWR-ORD | 2016-02-10T21:29:00.000 0000 | 78 |   

Вот мой код:

 days = lambda i: i * 86400  # this defines the 365 window I want to use  w1 = (Window.partitionBy('ROUTE')   .orderBy(df.CRS_DEP_TIME_UTC.cast('long'))   .rangeBetween(-days(365), 0))  # 1st udf I've created. Not at all sure it's correct.   def percent_rank_udf(aStruct):  return F.udf(lambda x: float(stats.percentileofscore(x.results, x.TURNAROUND_EST)),  FloatType())   df = df.withColumn("results", F.collect_list(F.col("TURNAROUND_EST")).over(w1))   .withColumn("TURN_PCTILE", percent_rank_udf(F.struct(F.col("TURNAROUND_EST"),  F.col("results")))  ).show(truncate = False)  

На самом деле я не уверен, насколько я близок. Я полагаю, что в первой строке (w/collect_list) собрана соответствующая дата ПОВОРОТА для окна 365 дней для этого маршрута и этого рейса. Итак, если я прав, для каждой из строк ~32 м он собирает от нескольких десятков до примерно ~5 тыс. целых чисел для каждого рейса, с медианой, вероятно, в несколько сотен.

Когда я запускаю это, я получаю: AssertionError: col should be Column

Я занимаюсь этим 12 часов, поэтому подумал, что сейчас самое подходящее время обратиться за помощью.

Я видел предположение, что это возможно без использования UDF, но на данный момент любые решения будут приветствоваться.