рекурсивная операция над одним и тем же столбцом в Pyspark

#apache-spark #pyspark #iteration

Вопрос:

У меня есть фрейм данных, такой:

Фрейм данных:

 |SEQ_ID |TIME_STAMP             |_MS               |
 ------- ----------------------- ------------------ 
|3879826|2021-07-29 11:24:20.525|NaN               |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|
|3879826|2021-07-29 11:27:43.264|27.247600203353613|
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|
|3879826|2021-07-29 11:36:19.128|13.011968111650264|
|3879826|2021-07-29 11:38:10.919|17.762006254598797|
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|
 

когда _MS >=3 и когда предыдущее > _MS меньше текущего _MS , я хочу увеличить новый столбец drift_MS на 100. Но если _MS < 3 и предыдущий _MS < текущий _MS , я хочу увеличить drift_MS на 1. Если ни одно из условий не удовлетворяет, я хочу установить значение 0

Ожидаемый результат:

 |SEQ_ID |TIME_STAMP             |_MS               |drift_MS|
 ------- ----------------------- ------------------ -------- 
|3879826|2021-07-29 11:24:20.525|NaN               |0       |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0       |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100     |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |0       |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |0       |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|1       |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|2       |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|102     |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|202     |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|0       |
 

У меня была другая версия этого вопроса, в которой я просто хотел сохранить прежнее значение прежним, и очень полезный участник предложил мне использовать функцию sum следующим образом;

 import pyspark.sql.functions as f

w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
    
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS', 
  f.sum(
    when((col('_MS') < 3) amp; (prev_MS < col('_MS')), 1)
    .when((col('_MS') >= 3) amp; (prev_MS < col('_MS')), 100)
    .otherwise(0)
 ).over(w1))
 

Это отлично работает, когда я хочу, чтобы предыдущее drift_MS значение оставалось прежним, если ни одно из условий не выполнено. Однако теперь мне нужно сбросить его до нуля, если условия не будут выполнены.
Я пытался понять это, но я все время натыкаюсь на стену, где мне нужно будет итеративно возвращаться к предыдущей строке, что обычно не делается в pyspark или больших данных, поскольку это наиболее эффективно при операциях по столбцам

The following code does not work for me:

 import pyspark.sql.functions as f

w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS_temp = (f.lag(col('drift_MS_temp'),1).over(w1))
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
    
prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS_temp', 
  f.sum(
    when((col('_MS') < 3) amp; (prev_MS < col('_MS')), 1)
    .when((col('_MS') >= 3) amp; (prev_MS < col('_MS')), 100)
    .otherwise(0)
 ).over(w1))
  .withColumn('drift_MS',when(prev_drift_MS_temp==col('drift_MS_temp'),0)
  .otherwise(col('drift_MS_temp') - prev_drift_MS_temp   prev_drift_MS))
 

Есть какие-нибудь мысли о том, как я могу это сделать?

ОБНОВЛЕНИЕ: Итак, после того, как я ломаю голову над этим, лучшая логика, которую я придумал до сих пор, состоит в том, чтобы создать другой столбец, drift_MS а затем получить условную совокупную сумму, когда столбец различий не 0 так похож на это:

 |SEQ_ID |TIME_STAMP             |_MS               |drift_MS|_diff   |drift   |
 ------- ----------------------- ------------------ -------- -------- -------- 
|3879826|2021-07-29 11:24:20.525|NaN               |0       |0       |0       |
|3879826|2021-07-29 11:25:56.934|21.262409581399556|0       |0       |0       |
|3879826|2021-07-29 11:27:43.264|27.247600203353613|100     |100     |100     |
|3879826|2021-07-29 11:29:27.613|18.13528511851038 |100     |0       |0       |
|3879826|2021-07-29 11:31:10.512|2.520896614376871 |100     |0       |0       |
|3879826|2021-07-29 11:32:54.252|2.7081931585605541|101     |1       |1       |
|3879826|2021-07-29 11:34:36.995|2.9832290627235505|102     |1       |1       |
|3879826|2021-07-29 11:36:19.128|13.011968111650264|202     |100     |102     |
|3879826|2021-07-29 11:38:10.919|17.762006254598797|302     |100     |202     |
|3879826|2021-07-29 11:40:01.929|1.9661930950977457|302     |0       |0       |
 

Псевдокод, который я бы представил, выглядел бы примерно так:

 import pyspark.sql.functions as f

w1=Window.partitionBy('SEQ_ID').orderBy(col('TIME_STAMP').asc())
prev_drift_MS = (f.lag(col('drift_MS'),1).over(w1))
prev_diff= (f.lag(col('_diff'),1).over(w1))

prev_MS = (f.lag(col('_MS'),1).over(w1))
df.withColumn('drift_MS', 
  f.sum(
    when((col('_MS') < 3) amp; (prev_MS < col('_MS')), 1)
    .when((col('_MS') >= 3) amp; (prev_MS < col('_MS')), 100)
    .otherwise(0)
 ).over(w1))
 .withColumn('_diff', prev_drift_MS - col('drift_MS'))
 .withColumn('drift', when(prev_diff==0, 0).otherwise(f.sum(col('drift')).over(w1)))
 

Каков правильный синтаксис, чтобы сделать это таким образом?

Комментарии:

1. Не могли бы вы объяснить, почему |3879826|2021-07-29 11:32:54.252|2.7081931585605541|100 | для начала эта строка получает 100? так как _MS меньше 3

2. @anky Большое вам спасибо за то, что указали на это. Мои извинения. Я исправил ожидаемый результат

Ответ №1:

Одним из вариантов, который мы можем использовать, было бы создание группы вспомогательных столбцов перед получением последнего drift_MS столбца. Давайте попробуем сделать это шаг за шагом.

  1. Создайте столбец x , применив те дополнительные условия, которые вы определили.
  2. Создайте столбец y в качестве флага, где значения в столбце сбрасываются до нуля x .
  3. Создайте столбец z для группировки строк между флагами. Мы можем использовать кумулятивную сумму внутри строк между текущей строкой и неограниченными следующими строками.
  4. Наконец, создайте столбец drift_MS в виде совокупной суммы сгруппированных строк по SEQ_ID и вспомогательному столбцу z , упорядоченному по TIME_STAMP .

Эти шаги, введенные в код, будут выглядеть следующим образом (их легче читать в выражениях SQL):

 import pyspark.sql.functions as F

expr_x = F.expr("""
    case 
    when _MS >= 3 AND lag(_MS) over (partition by SEQ_ID  order by TIME_STAMP) < _MS then 100
    when _MS < 3 AND lag(_MS) over (partition by SEQ_ID order by TIME_STAMP) < _MS then 1
    else 0 end  """)

expr_y = F.expr("""
    case 
    when x <> 0 and lead(x) over (partition by SEQ_ID order by TIME_STAMP) = 0 then 1
    else null end """)

expr_z = F.expr("""
    sum(y) over(partition by SEQ_ID 
                order by TIME_STAMP 
                rows between 0 preceding and unbounded following) """)

expr_drift = F.expr("""
    sum(x) over (partition by SEQ_ID, z 
                 order by TIME_STAMP 
                 rows between unbounded preceding and 0 following) """)

df = (df
      .withColumn('x', expr_x)
      .withColumn('y', expr_y)
      .withColumn('z', expr_z)
      .withColumn("drift_MS", expr_drift))
df.show()

#  ------- -------------------- ------------------ --- ---- ---- -------- 
# | SEQ_ID|          TIME_STAMP|               _MS|  x|   y|   z|drift_MS|
#  ------- -------------------- ------------------ --- ---- ---- -------- 
# |3879826|2021-07-29 11:24:...|               NaN|  0|null|   2|       0|
# |3879826|2021-07-29 11:25:...|21.262409581399556|  0|null|   2|       0|
# |3879826|2021-07-29 11:27:...|27.247600203353613|100|   1|   2|     100|
# |3879826|2021-07-29 11:29:...| 18.13528511851038|  0|null|   1|       0|
# |3879826|2021-07-29 11:31:...| 2.520896614376871|  0|null|   1|       0|
# |3879826|2021-07-29 11:32:...| 2.708193158560554|  1|null|   1|       1|
# |3879826|2021-07-29 11:34:...|2.9832290627235505|  1|null|   1|       2|
# |3879826|2021-07-29 11:36:...|13.011968111650264|100|null|   1|     102|
# |3879826|2021-07-29 11:38:...|  17.7620062545988|100|   1|   1|     202|
# |3879826|2021-07-29 11:40:...|1.9661930950977458|  0|null|null|       0|
#  ------- -------------------- ------------------ --- ---- ---- -------- 
 

Комментарии:

1. Спасибо вам за решение! Я сделал шаги 1 и 2, но застрял на шаге 3. Это помогает!

2. Не могли бы вы объяснить немного подробнее об этой rows between unbounded preceding and 0 following роли? Я понимаю, что это будет суммировать строки из первой строки раздела в текущую строку. Но если я упорядочиваю метку времени по возрастанию, не будет ли первая строка и текущая строка одинаковыми?

3. правильно, этот кадр будет работать со всеми значениями от первой строки до текущей строки. Что касается вопроса, я не уверен, правильно ли я его понимаю, но первая строка = текущая строка верна только для первой строки раздела.

4. Я предполагаю, что мой вопрос касается drift_MS столбца, почему бы также не сделать накопительную сумму rows between 0 preceding and unbounded following , как вы сделали для z столбца?

5. да, правильно, потому что нам нужен столбец z для группировки, чтобы получить drift_MS