Изменение значения кластера в столбце в pyspark

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

У меня есть следующие данные в csv:

  ---- ---------- ----- 
|name| timestamp|value|
 ---- ---------- ----- 
|   A|1604219844|    7|
|   A|1604219845|    1|
|   A|1604219846|    1|
|   A|1604219847|    1|
|   A|1604219848|    2|
|   A|1604219849|    7|
|   A|1604219850|    1|
|   A|1604219851|    1|
|   A|1604219852|    2|
|   A|1604219853|    7|
|   A|1604219854|    1|
|   A|1604219855|    1|
 ---- ---------- ----- 

  

Чтобы отслеживать изменения в последовательном значении, я до сих пор реализовал следующий код :

 import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as psf

spark = SparkSession.builder.getOrCreate()

data = spark.read.option("header","true").csv("sample_data.csv")
data.show()

w = Window.orderBy("timestamp")
value_lag = lag('value').over(w)
timestamp_lag = lag('timestamp').over(w)

df_final = data.withColumn('prev_timestamp', timestamp_lag).withColumn('prev_value', value_lag)
.withColumn("changed", (data.value != psf.lag('value').over(w)).cast('int'))
.withColumn("diff",data.value - psf.lag('value').over(w))

df_final.show()
  

Вывод выглядит следующим образом из приведенного выше кода:

  ---- ---------- ----- -------------- ---------- ------- ---- 
|name| timestamp|value|prev_timestamp|prev_value|changed|diff|
 ---- ---------- ----- -------------- ---------- ------- ---- 
|   A|1604219844|    7|          null|      null|   null|null|
|   A|1604219845|    1|    1604219844|         7|      1|-6.0|
|   A|1604219846|    1|    1604219845|         1|      0| 0.0|
|   A|1604219847|    1|    1604219846|         1|      0| 0.0|
|   A|1604219848|    2|    1604219847|         1|      1| 1.0|
|   A|1604219849|    7|    1604219848|         2|      1| 5.0|
|   A|1604219850|    1|    1604219849|         7|      1|-6.0|
|   A|1604219851|    1|    1604219850|         1|      0| 0.0|
|   A|1604219852|    2|    1604219851|         1|      1| 1.0|
|   A|1604219853|    7|    1604219852|         2|      1| 5.0|
|   A|1604219854|    1|    1604219853|         7|      1|-6.0|
|   A|1604219855|    1|    1604219854|         1|      0| 0.0|
 ---- ---------- ----- -------------- ---------- ------- ---- 
  

Я хочу реализовать дополнительный столбец в приведенном выше фрейме данных для кластеризации изменений 1 -> 2 -> 7 -> 1 , происходящих последовательно

  ---- ---------- ----- -------------- ---------- ------- ---- --------------- 
|name| timestamp|value|prev_timestamp|prev_value|changed|diff|        keyword|
 ---- ---------- ----- -------------- ---------- ------- ---- --------------- 
|   A|1604219844|    7|          null|      null|   null|null|           null|
|   A|1604219845|    1|    1604219844|         7|      1|  -6|Insert1-Update1|
|   A|1604219846|    1|    1604219845|         1|      0|   0|           null|
|   A|1604219847|    1|    1604219846|         1|      0|   0|           null|
|   A|1604219848|    2|    1604219847|         1|      1|   1|        Insert2|
|   A|1604219849|    7|    1604219848|         2|      1|   5|Insert2-Update1|
|   A|1604219850|    1|    1604219849|         7|      1|  -6|Insert2-Update2|
|   A|1604219851|    1|    1604219850|         1|      0|   0|           null|
|   A|1604219852|    2|    1604219851|         1|      1|   1|        Insert3|
|   A|1604219853|    7|    1604219852|         2|      1|   5|Insert3-Update1|
|   A|1604219854|    1|    1604219853|         7|      1|  -6|Insert3-Update2|
|   A|1604219855|    1|    1604219854|         1|      0|   0|           null|
 ---- ---------- ----- -------------- ---------- ------- ---- --------------- 
  

Идея этого столбца будет заключаться в том, что он может помочь сгруппировать столбцы на основе ключа Insert2,Insert3 , а затем внутри каждого из них Insert2-Update1,Insert2-Update2 , чтобы конечный кластеризованный результат, который необходимо достичь, был

  ---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------ 
|name| timestamp|value|prev_timestamp|prev_value|changed_timestamp_1_to_2|changed_timestamp_2_to_7|changed_timestamp_7_to_1|
 ---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------ 
|   A|1604219848|    2|    1604219847|         1|              1604219848|              1604219849|              1604219850|
|   A|1604219852|    2|    1604219851|         1|              1604219852|              1604219853|              1604219854|
 ---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------ 
  

Любая помощь приветствуется

Комментарии:

1. У вас есть другие значения имен? Пример кластеризации не очень хорошо объяснен.

2. @thebluephantom. какая часть неясна? . дело в том, что я хочу фиксировать временную метку, когда значение изменяется с 1 на 2,2 на 7, а затем с 7 на 1. У меня есть отдельная строка для каждого изменения

3. так, например, всегда ли имя A?

4. да .. у меня 30 столбцов, но если это работает для одного column..it легко реплицируется для других столбцов.