#apache-spark #pyspark #apache-spark-sql #time-series
#apache-искра #пыспарк #apache-spark-sql #временные ряды
Вопрос:
Я хочу сделать форвардную заливку в Pyspark по нескольким столбцам. и если начальное значение столбца «NaN», то замените его на 0. Ниже приведен мой DF, как выглядит.
отметка времени начала | Колонка1 | Колонка2 | Колонка3 | Колонка4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | нулевой | нулевой | нулевой |
2020-11-02 09:14:29 | нулевой | нулевой | нулевой | 40 |
2020-11-02 09:18:32 | нулевой | 4 | 2 | нулевой |
2020-11-02 09:32:42 | 4 | нулевой | нулевой | нулевой |
2020-11-03 13:06:03 | нулевой | нулевой | нулевой | 20 |
2020-11-03 13:10:01 | 6 | нулевой | 4 | нулевой |
2020-11-03 13:54:38 | нулевой | 5 | нулевой | нулевой |
2020-11-03 14:46:25 | нулевой | нулевой | нулевой | нулевой |
2020-11-03 14:57:31 | 7 | нулевой | нулевой | 10 |
2020-11-03 15:07:07 | 8 | 7 | нулевой | нулевой |
Ожидаемый DF будет:
отметка времени начала | Колонка1 | Колонка2 | Колонка3 | Колонка4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | 0 | 0 | 0 |
2020-11-02 09:14:29 | 2 | 0 | 0 | 40 |
2020-11-02 09:18:32 | 2 | 4 | 2 | 40 |
2020-11-02 09:32:42 | 4 | 4 | 2 | 40 |
2020-11-03 13:06:03 | 4 | 4 | 2 | 20 |
2020-11-03 13:10:01 | 6 | 4 | 4 | 20 |
2020-11-03 13:54:38 | 6 | 5 | 4 | 20 |
2020-11-03 14:46:25 | 6 | 5 | 4 | 20 |
2020-11-03 14:57:31 | 7 | 5 | 4 | 10 |
2020-11-03 15:07:07 | 8 | 7 | 4 | 10 |
Ниже приведен код, который я попробовал, который я получил на stackoverflow:
from pyspark.sql import Window from pyspark.sql.functions import last,first from pyspark.sql.functions import col, max as max_, min as min_ import sys def stringReplaceFunc(x, y): return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"): for i in cols: window = Window .partitionBy(F.month(partitioner)) .orderBy(partitioner) .rowsBetween(-sys.maxsize, 0) df= df .withColumn(i, stringReplaceFunc(F.col(i), value)) fill = F.last(df[i], ignorenulls=True).over(window) df= df.withColumn(i, fill) return df df= forwardFillImputer(df, cols=[i for i in df.columns])
код не работает, пожалуйста, дайте мне знать, какую ошибку я совершаю. Пожалуйста, дайте мне знать, если есть какое-либо альтернативное решение. Спасибо.
Комментарии:
1. @thePurplePython Привет, я нашел ваше решение, не могли бы вы помочь мне с этим.
Ответ №1:
В вашем текущем коде вы не должны разбивать окно по месяцам, и использование rowsBetween
бесполезно. У вас должно быть только заказанное окно для каждого start_timestamp
Более того, вы не управляете случаем, когда нет последнего значения. Вы можете управлять им, используя coalesce
буквальное значение '0'
Таким образом, ваш код может быть переписан следующим образом:
from pyspark.sql import functions as F from pyspark.sql import Window def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'): for c in cols: df = df.withColumn(c, F.when(F.col(c) != value, F.col(c))) df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0'))) return df df = forwardFillImputer(df, df.columns)
со следующим фреймом данных в виде df
:
------------------- ------- ------- ------- ------- |start_timestamp |Column1|Column2|Column3|Column4| ------------------- ------- ------- ------- ------- |2020-11-02 08:51:50|2 |null |null |null | |2020-11-02 09:14:29|null |null |null |40 | |2020-11-02 09:18:32|null |4 |2 |null | |2020-11-02 09:32:42|4 |null |null |null | |2020-11-03 13:06:03|null |null |null |20 | |2020-11-03 13:10:01|6 |null |4 |null | |2020-11-03 13:54:38|null |5 |null |null | |2020-11-03 14:46:25|null |null |null |null | |2020-11-03 14:57:31|7 |null |null |10 | |2020-11-03 15:07:07|8 |7 |null |null | ------------------- ------- ------- ------- -------
Вы получаете следующий результат:
------------------- ------- ------- ------- ------- |start_timestamp |Column1|Column2|Column3|Column4| ------------------- ------- ------- ------- ------- |2020-11-02 08:51:50|2 |0 |0 |0 | |2020-11-02 09:14:29|2 |0 |0 |40 | |2020-11-02 09:18:32|2 |4 |2 |40 | |2020-11-02 09:32:42|4 |4 |2 |40 | |2020-11-03 13:06:03|4 |4 |2 |20 | |2020-11-03 13:10:01|6 |4 |4 |20 | |2020-11-03 13:54:38|6 |5 |4 |20 | |2020-11-03 14:46:25|6 |5 |4 |20 | |2020-11-03 14:57:31|7 |5 |4 |10 | |2020-11-03 15:07:07|8 |7 |4 |10 | ------------------- ------- ------- ------- -------
Комментарии:
1. Спасибо, код сработал, я внес небольшие изменения:
def forwardFillImputer(data, cols=[], partitioner='resampled_start_timestamp', value='null'): for c in cols: data = data.withColumn(c, F.when(F.col(c).isNotNull(), F.col(c))) data = data.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0'))) return data data = forwardFillImputer(data, data.columns)