#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
У меня есть следующие данные в csv:
---- ---------- -----
|name| timestamp|value|
---- ---------- -----
| A|1604219844| 7|
| A|1604219845| 1|
| A|1604219846| 1|
| A|1604219847| 1|
| A|1604219848| 2|
| A|1604219849| 7|
| A|1604219850| 1|
| A|1604219851| 1|
| A|1604219852| 2|
| A|1604219853| 7|
| A|1604219854| 1|
| A|1604219855| 1|
---- ---------- -----
Чтобы отслеживать изменения в последовательном значении, я до сих пор реализовал следующий код :
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as psf
spark = SparkSession.builder.getOrCreate()
data = spark.read.option("header","true").csv("sample_data.csv")
data.show()
w = Window.orderBy("timestamp")
value_lag = lag('value').over(w)
timestamp_lag = lag('timestamp').over(w)
df_final = data.withColumn('prev_timestamp', timestamp_lag).withColumn('prev_value', value_lag)
.withColumn("changed", (data.value != psf.lag('value').over(w)).cast('int'))
.withColumn("diff",data.value - psf.lag('value').over(w))
df_final.show()
Вывод выглядит следующим образом из приведенного выше кода:
---- ---------- ----- -------------- ---------- ------- ----
|name| timestamp|value|prev_timestamp|prev_value|changed|diff|
---- ---------- ----- -------------- ---------- ------- ----
| A|1604219844| 7| null| null| null|null|
| A|1604219845| 1| 1604219844| 7| 1|-6.0|
| A|1604219846| 1| 1604219845| 1| 0| 0.0|
| A|1604219847| 1| 1604219846| 1| 0| 0.0|
| A|1604219848| 2| 1604219847| 1| 1| 1.0|
| A|1604219849| 7| 1604219848| 2| 1| 5.0|
| A|1604219850| 1| 1604219849| 7| 1|-6.0|
| A|1604219851| 1| 1604219850| 1| 0| 0.0|
| A|1604219852| 2| 1604219851| 1| 1| 1.0|
| A|1604219853| 7| 1604219852| 2| 1| 5.0|
| A|1604219854| 1| 1604219853| 7| 1|-6.0|
| A|1604219855| 1| 1604219854| 1| 0| 0.0|
---- ---------- ----- -------------- ---------- ------- ----
Я хочу реализовать дополнительный столбец в приведенном выше фрейме данных для кластеризации изменений 1 -> 2 -> 7 -> 1
, происходящих последовательно
---- ---------- ----- -------------- ---------- ------- ---- ---------------
|name| timestamp|value|prev_timestamp|prev_value|changed|diff| keyword|
---- ---------- ----- -------------- ---------- ------- ---- ---------------
| A|1604219844| 7| null| null| null|null| null|
| A|1604219845| 1| 1604219844| 7| 1| -6|Insert1-Update1|
| A|1604219846| 1| 1604219845| 1| 0| 0| null|
| A|1604219847| 1| 1604219846| 1| 0| 0| null|
| A|1604219848| 2| 1604219847| 1| 1| 1| Insert2|
| A|1604219849| 7| 1604219848| 2| 1| 5|Insert2-Update1|
| A|1604219850| 1| 1604219849| 7| 1| -6|Insert2-Update2|
| A|1604219851| 1| 1604219850| 1| 0| 0| null|
| A|1604219852| 2| 1604219851| 1| 1| 1| Insert3|
| A|1604219853| 7| 1604219852| 2| 1| 5|Insert3-Update1|
| A|1604219854| 1| 1604219853| 7| 1| -6|Insert3-Update2|
| A|1604219855| 1| 1604219854| 1| 0| 0| null|
---- ---------- ----- -------------- ---------- ------- ---- ---------------
Идея этого столбца будет заключаться в том, что он может помочь сгруппировать столбцы на основе ключа Insert2,Insert3
, а затем внутри каждого из них Insert2-Update1,Insert2-Update2
, чтобы конечный кластеризованный результат, который необходимо достичь, был
---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------
|name| timestamp|value|prev_timestamp|prev_value|changed_timestamp_1_to_2|changed_timestamp_2_to_7|changed_timestamp_7_to_1|
---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------
| A|1604219848| 2| 1604219847| 1| 1604219848| 1604219849| 1604219850|
| A|1604219852| 2| 1604219851| 1| 1604219852| 1604219853| 1604219854|
---- ---------- ----- -------------- ---------- ------------------------ ------------------------ ------------------------
Любая помощь приветствуется
Комментарии:
1. У вас есть другие значения имен? Пример кластеризации не очень хорошо объяснен.
2. @thebluephantom. какая часть неясна? . дело в том, что я хочу фиксировать временную метку, когда значение изменяется с 1 на 2,2 на 7, а затем с 7 на 1. У меня есть отдельная строка для каждого изменения
3. так, например, всегда ли имя A?
4. да .. у меня 30 столбцов, но если это работает для одного column..it легко реплицируется для других столбцов.