#python #apache-spark #pyspark #databricks
Вопрос:
У меня есть фрейм данных Spark, похожий на следующий:
id claim_id service_date status product
123 10606134411906233408 2018-09-17T00:00:00.000 0000 PD blue
123 10606147900401009928 2019-01-24T00:00:00.000 0000 PD yellow
123 10606160940704723994 2019-05-23T00:00:00.000 0000 RV yellow
123 10606171648203079553 2019-08-29T00:00:00.000 0000 RJ blue
123 10606186611407311724 2020-01-13T00:00:00.000 0000 PD blue
Простите меня за то, что я не вставил никакого кода, так как ничего не сработало. Я хочу добавить новый столбец с максимальным значением(service_date) предыдущей строки, где статус-PD, а произведение текущей строки = произведение предыдущей строки.
Это легко сделать с помощью коррелированного подзапроса, но это неэффективно и, кроме того, не выполнимо в Spark, поскольку неэквивалентные соединения не поддерживаются. Также обратите внимание, что ЗАДЕРЖКА не будет работать, потому что мне не всегда требуется непосредственная предыдущая запись (и смещение будет динамическим).
Ожидаемый результат будет примерно таким:
id claim_id service_date status product previous_service_date
123 10606134411906233408 2018-09-17T00:00:00.000 0000 PD blue
123 10606147900401009928 2019-01-24T00:00:00.000 0000 PD yellow
123 10606160940704723994 2019-05-23T00:00:00.000 0000 RV yellow 2019-01-24T00:00:00.000 0000
123 10606171648203079553 2019-08-29T00:00:00.000 0000 RJ blue 2018-09-17T00:00:00.000 0000
123 10606186611407311724 2020-01-13T00:00:00.000 0000 PD blue 2018-09-17T00:00:00.000 0000
Ответ №1:
Вы можете попробовать следующее, которое использует max
в качестве оконной функции when
(выражение case), но фокусируется на предыдущих строках
from pyspark.sql import functions as F
from pyspark.sql import Window
df = df.withColumn('previous_service_date',F.max(
F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
).over(
Window.partitionBy("product")
.rowsBetween(Window.unboundedPreceding,-1)
))
df.orderBy('service_date').show(truncate=False)
--- -------------------- ------------------- ------ ------- ---------------------
|id |claim_id |service_date |status|product|previous_service_date|
--- -------------------- ------------------- ------ ------- ---------------------
|123|10606134411906233408|2018-09-17 00:00:00|PD |blue |null |
|123|10606147900401009928|2019-01-24 00:00:00|PD |yellow |null |
|123|10606160940704723994|2019-05-23 00:00:00|RV |yellow |2019-01-24 00:00:00 |
|123|10606171648203079553|2019-08-29 00:00:00|RJ |blue |2018-09-17 00:00:00 |
|123|10606186611407311724|2020-01-13 00:00:00|PD |blue |2018-09-17 00:00:00 |
--- -------------------- ------------------- ------ ------- ---------------------
Правка 1
Вы также можете использовать last
, как указано ниже
df = df.withColumn('previous_service_date',F.last(
F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
).over(
Window.partitionBy("product")
.orderBy('service_date')
.rowsBetween(Window.unboundedPreceding,-1)
))
Дайте мне знать, если это сработает для вас.
Комментарии:
1. Это работает так, как ожидалось: спасибо. Быстрое продолжение; могу ли я провести рефакторинг, чтобы он также соответствовал продукту? Другими словами, укажите предыдущую дату обслуживания, в которой статус «PD», но также укажите, где продукт предыдущей записи соответствует продукту текущей записи?
2. @MichaelBass В настоящее время он делает это с помощью
partitionBy("product")
. Фильтр, использующий » F. когда(F. col(«статус»)==»PD» ,F. col(«дата обслуживания»))», был применен как таковой, поскольку рассматривались только даты обслуживания соPD
статусом. Есть ли у вас другие тестовые примеры/данные, в которых результаты не соответствуют ожидаемым?3. Извините за мой предыдущий комментарий, вы правы. Я возился с кодом и случайно изменил раздел. Это отлично работает и очень ценится. Когда-нибудь я стану таким же умным, как ты.
Ответ №2:
Вы можете copy
перенести свой фрейм данных в новый фрейм данных ( df2
) и join
оба, как показано ниже:
(df.join(df2,
on = [df.Service_date > df2.Service_date,
df.product == df2.product,
df2.status == 'PD'],
how = "left"))
Удалите дублированные столбцы и переименуйте df2.Service_date
их в previous_service_date
Комментарии:
1. Спасибо за это. Это хорошее решение, но оно не работает в моем сценарии, так как возвращает несколько записей.