#apache-spark #pyspark #apache-spark-sql
#apache-искра #пыспарк #apache-spark-sql
Вопрос:
Я всю ночь ломал голову над этой проблемой и надеялся получить какую-нибудь помощь.
У нас есть задача прогнозирования задержки рейса для школы, использующая большие, если не большие, данные (~32 млн строк). Я много занимался разработкой функций, потому что считал, что это потенциально очень ценный и хороший способ изучить Spark.
Функция, которую я хотел разработать, заключалась в процентиле времени оборота по маршруту. Например, предположим, что маршрут из Ньюарка (EWR) в О’Хара (ORD), а прибывающий рейс опаздывает и прибывает в 8:00 утра, а его запланированный вылет-8:35 утра. Время выполнения заказа составляет 35 минут. Это много или мало для этого маршрута?
Что я хотел сделать, так это рассчитать процентиль для этого маршрута, используя percent_rank
. Но я также хотел использовать конечные 12 месяцев (и, что более важно, нет будущего периода времени, который не был бы доступен в то время). Именно это сочетание критериев сбивает меня с толку. Приведенные ниже данные являются репрезентативными, но имейте в виду, что я разделюсь на несколько маршрутов.
| ROUTE | CRS_DEP_TIME_UTC | TURNAROUND_EST | |---------|------------------------------|----------------| | EWR-ORD | 2015-07-15T20:38:00.000 0000 | 57 | | EWR-ORD | 2015-07-30T00:00:00.000 0000 | 50 | | EWR-ORD | 2015-08-23T21:40:00.000 0000 | 56 | | EWR-ORD | 2016-02-09T21:29:00.000 0000 | 70 | | EWR-ORD | 2016-02-10T21:29:00.000 0000 | 78 |
Вот мой код:
days = lambda i: i * 86400 # this defines the 365 window I want to use w1 = (Window.partitionBy('ROUTE') .orderBy(df.CRS_DEP_TIME_UTC.cast('long')) .rangeBetween(-days(365), 0)) # 1st udf I've created. Not at all sure it's correct. def percent_rank_udf(aStruct): return F.udf(lambda x: float(stats.percentileofscore(x.results, x.TURNAROUND_EST)), FloatType()) df = df.withColumn("results", F.collect_list(F.col("TURNAROUND_EST")).over(w1)) .withColumn("TURN_PCTILE", percent_rank_udf(F.struct(F.col("TURNAROUND_EST"), F.col("results"))) ).show(truncate = False)
На самом деле я не уверен, насколько я близок. Я полагаю, что в первой строке (w/collect_list) собрана соответствующая дата ПОВОРОТА для окна 365 дней для этого маршрута и этого рейса. Итак, если я прав, для каждой из строк ~32 м он собирает от нескольких десятков до примерно ~5 тыс. целых чисел для каждого рейса, с медианой, вероятно, в несколько сотен.
Когда я запускаю это, я получаю: AssertionError: col should be Column
Я занимаюсь этим 12 часов, поэтому подумал, что сейчас самое подходящее время обратиться за помощью.
Я видел предположение, что это возможно без использования UDF, но на данный момент любые решения будут приветствоваться.