Добавьте столбец в фрейм данных Spark с максимальным значением, которое меньше значения текущей записи

#python #apache-spark #pyspark #databricks

Вопрос:

У меня есть фрейм данных Spark, похожий на следующий:

 id  claim_id                 service_date                  status   product
123 10606134411906233408    2018-09-17T00:00:00.000 0000    PD      blue
123 10606147900401009928    2019-01-24T00:00:00.000 0000    PD      yellow
123 10606160940704723994    2019-05-23T00:00:00.000 0000    RV      yellow
123 10606171648203079553    2019-08-29T00:00:00.000 0000    RJ      blue
123 10606186611407311724    2020-01-13T00:00:00.000 0000    PD      blue
 

Простите меня за то, что я не вставил никакого кода, так как ничего не сработало. Я хочу добавить новый столбец с максимальным значением(service_date) предыдущей строки, где статус-PD, а произведение текущей строки = произведение предыдущей строки.

Это легко сделать с помощью коррелированного подзапроса, но это неэффективно и, кроме того, не выполнимо в Spark, поскольку неэквивалентные соединения не поддерживаются. Также обратите внимание, что ЗАДЕРЖКА не будет работать, потому что мне не всегда требуется непосредственная предыдущая запись (и смещение будет динамическим).

Ожидаемый результат будет примерно таким:

 id  claim_id                 service_date                  status   product     previous_service_date
    123 10606134411906233408    2018-09-17T00:00:00.000 0000    PD      blue
    123 10606147900401009928    2019-01-24T00:00:00.000 0000    PD      yellow
    123 10606160940704723994    2019-05-23T00:00:00.000 0000    RV      yellow      2019-01-24T00:00:00.000 0000
    123 10606171648203079553    2019-08-29T00:00:00.000 0000    RJ      blue        2018-09-17T00:00:00.000 0000
    123 10606186611407311724    2020-01-13T00:00:00.000 0000    PD      blue        2018-09-17T00:00:00.000 0000
 

Ответ №1:

Вы можете попробовать следующее, которое использует max в качестве оконной функции when (выражение case), но фокусируется на предыдущих строках

 from pyspark.sql import functions as F
from pyspark.sql import Window


df = df.withColumn('previous_service_date',F.max(
    F.when(F.col("status")=="PD",F.col("service_date")).otherwise(None)
).over(
    Window.partitionBy("product")
          .rowsBetween(Window.unboundedPreceding,-1)
))

df.orderBy('service_date').show(truncate=False)
 
  --- -------------------- ------------------- ------ ------- --------------------- 
|id |claim_id            |service_date       |status|product|previous_service_date|
 --- -------------------- ------------------- ------ ------- --------------------- 
|123|10606134411906233408|2018-09-17 00:00:00|PD    |blue   |null                 |
|123|10606147900401009928|2019-01-24 00:00:00|PD    |yellow |null                 |
|123|10606160940704723994|2019-05-23 00:00:00|RV    |yellow |2019-01-24 00:00:00  |
|123|10606171648203079553|2019-08-29 00:00:00|RJ    |blue   |2018-09-17 00:00:00  |
|123|10606186611407311724|2020-01-13 00:00:00|PD    |blue   |2018-09-17 00:00:00  |
 --- -------------------- ------------------- ------ ------- --------------------- 
 

Правка 1

Вы также можете использовать last , как указано ниже

 df = df.withColumn('previous_service_date',F.last(
    F.when(F.col("status")=="PD" ,F.col("service_date")).otherwise(None),True
).over(
    Window.partitionBy("product")
          .orderBy('service_date')
          .rowsBetween(Window.unboundedPreceding,-1)
))
 

Дайте мне знать, если это сработает для вас.

Комментарии:

1. Это работает так, как ожидалось: спасибо. Быстрое продолжение; могу ли я провести рефакторинг, чтобы он также соответствовал продукту? Другими словами, укажите предыдущую дату обслуживания, в которой статус «PD», но также укажите, где продукт предыдущей записи соответствует продукту текущей записи?

2. @MichaelBass В настоящее время он делает это с помощью partitionBy("product") . Фильтр, использующий » F. когда(F. col(«статус»)==»PD» ,F. col(«дата обслуживания»))», был применен как таковой, поскольку рассматривались только даты обслуживания со PD статусом. Есть ли у вас другие тестовые примеры/данные, в которых результаты не соответствуют ожидаемым?

3. Извините за мой предыдущий комментарий, вы правы. Я возился с кодом и случайно изменил раздел. Это отлично работает и очень ценится. Когда-нибудь я стану таким же умным, как ты.

Ответ №2:

Вы можете copy перенести свой фрейм данных в новый фрейм данных ( df2 ) и join оба, как показано ниже:

 (df.join(df2, 
         on = [df.Service_date > df2.Service_date,
               df.product == df2.product,
               df2.status == 'PD'],
         how = "left"))
 

Удалите дублированные столбцы и переименуйте df2.Service_date их в previous_service_date

Комментарии:

1. Спасибо за это. Это хорошее решение, но оно не работает в моем сценарии, так как возвращает несколько записей.