Как сделать форвардное заполнение Pyspark в нескольких столбцах

#apache-spark #pyspark #apache-spark-sql #time-series

#apache-искра #пыспарк #apache-spark-sql #временные ряды

Вопрос:

Я хочу сделать форвардную заливку в Pyspark по нескольким столбцам. и если начальное значение столбца «NaN», то замените его на 0. Ниже приведен мой DF, как выглядит.

отметка времени начала Колонка1 Колонка2 Колонка3 Колонка4
2020-11-02 08:51:50 2 нулевой нулевой нулевой
2020-11-02 09:14:29 нулевой нулевой нулевой 40
2020-11-02 09:18:32 нулевой 4 2 нулевой
2020-11-02 09:32:42 4 нулевой нулевой нулевой
2020-11-03 13:06:03 нулевой нулевой нулевой 20
2020-11-03 13:10:01 6 нулевой 4 нулевой
2020-11-03 13:54:38 нулевой 5 нулевой нулевой
2020-11-03 14:46:25 нулевой нулевой нулевой нулевой
2020-11-03 14:57:31 7 нулевой нулевой 10
2020-11-03 15:07:07 8 7 нулевой нулевой

Ожидаемый DF будет:

отметка времени начала Колонка1 Колонка2 Колонка3 Колонка4
2020-11-02 08:51:50 2 0 0 0
2020-11-02 09:14:29 2 0 0 40
2020-11-02 09:18:32 2 4 2 40
2020-11-02 09:32:42 4 4 2 40
2020-11-03 13:06:03 4 4 2 20
2020-11-03 13:10:01 6 4 4 20
2020-11-03 13:54:38 6 5 4 20
2020-11-03 14:46:25 6 5 4 20
2020-11-03 14:57:31 7 5 4 10
2020-11-03 15:07:07 8 7 4 10

Ниже приведен код, который я попробовал, который я получил на stackoverflow:

 from pyspark.sql import Window from pyspark.sql.functions import last,first from pyspark.sql.functions import col, max as max_, min as min_ import sys  def stringReplaceFunc(x, y):  return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL  def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"):   for i in cols:  window = Window  .partitionBy(F.month(partitioner))  .orderBy(partitioner)  .rowsBetween(-sys.maxsize, 0)  df= df  .withColumn(i, stringReplaceFunc(F.col(i), value))  fill = F.last(df[i], ignorenulls=True).over(window)  df= df.withColumn(i, fill)  return df  df= forwardFillImputer(df, cols=[i for i in df.columns])  

код не работает, пожалуйста, дайте мне знать, какую ошибку я совершаю. Пожалуйста, дайте мне знать, если есть какое-либо альтернативное решение. Спасибо.

Комментарии:

1. @thePurplePython Привет, я нашел ваше решение, не могли бы вы помочь мне с этим.

Ответ №1:

В вашем текущем коде вы не должны разбивать окно по месяцам, и использование rowsBetween бесполезно. У вас должно быть только заказанное окно для каждого start_timestamp

Более того, вы не управляете случаем, когда нет последнего значения. Вы можете управлять им, используя coalesce буквальное значение '0'

Таким образом, ваш код может быть переписан следующим образом:

 from pyspark.sql import functions as F from pyspark.sql import Window  def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):  for c in cols:  df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))  df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))  return df  df = forwardFillImputer(df, df.columns)  

со следующим фреймом данных в виде df :

  ------------------- ------- ------- ------- -------  |start_timestamp |Column1|Column2|Column3|Column4|  ------------------- ------- ------- ------- -------  |2020-11-02 08:51:50|2 |null |null |null | |2020-11-02 09:14:29|null |null |null |40 | |2020-11-02 09:18:32|null |4 |2 |null | |2020-11-02 09:32:42|4 |null |null |null | |2020-11-03 13:06:03|null |null |null |20 | |2020-11-03 13:10:01|6 |null |4 |null | |2020-11-03 13:54:38|null |5 |null |null | |2020-11-03 14:46:25|null |null |null |null | |2020-11-03 14:57:31|7 |null |null |10 | |2020-11-03 15:07:07|8 |7 |null |null |  ------------------- ------- ------- ------- -------   

Вы получаете следующий результат:

  ------------------- ------- ------- ------- -------  |start_timestamp |Column1|Column2|Column3|Column4|  ------------------- ------- ------- ------- -------  |2020-11-02 08:51:50|2 |0 |0 |0 | |2020-11-02 09:14:29|2 |0 |0 |40 | |2020-11-02 09:18:32|2 |4 |2 |40 | |2020-11-02 09:32:42|4 |4 |2 |40 | |2020-11-03 13:06:03|4 |4 |2 |20 | |2020-11-03 13:10:01|6 |4 |4 |20 | |2020-11-03 13:54:38|6 |5 |4 |20 | |2020-11-03 14:46:25|6 |5 |4 |20 | |2020-11-03 14:57:31|7 |5 |4 |10 | |2020-11-03 15:07:07|8 |7 |4 |10 |  ------------------- ------- ------- ------- -------   

Комментарии:

1. Спасибо, код сработал, я внес небольшие изменения: def forwardFillImputer(data, cols=[], partitioner='resampled_start_timestamp', value='null'): for c in cols: data = data.withColumn(c, F.when(F.col(c).isNotNull(), F.col(c))) data = data.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0'))) return data data = forwardFillImputer(data, data.columns)