Искра условное прямое заполнение во временном окне

#dataframe #apache-spark #apache-spark-sql

Вопрос:

У меня есть серия измерений с некоторыми нулевыми значениями, которые я хотел бы заполнить вперед, только если предыдущая строка была меньше двух часов назад (на основе столбца с меткой времени).

  • если в строке есть значение, а следующие строки содержат значение null, но между ними менее двух часов, я хочу заполнить до следующего значения/последней строки.
  • если между двумя строками более двух часов, следующие строки не должны заполняться вперед, пока снова не появится строка со значением.

Для тестирования я создаю свои входные данные следующим образом:

 val df = Seq(
  ("a", 1624450756, Some(5.0)),
  ("a", 1624451416, None),
  ("a", 1624457957, None),
  ("a", 1624537816, None),
  ("a", 1624627800, Some(1.0)),
  ("a", 1624635000, None),
  ("a", 1624642199, None)).toDF("uuid", "ts", "value").withColumn("measurement_date", to_timestamp(from_unixtime(col("ts"))))

df.show
 ---- ---------- ----- ------------------- 
|uuid|        ts|value|   measurement_date|
 ---- ---------- ----- ------------------- 
|   a|1624450756|  5.0|2021-06-23 14:19:16|
|   a|1624451416| null|2021-06-23 14:30:16|
|   a|1624457957| null|2021-06-23 16:19:17|
|   a|1624537816| null|2021-06-24 14:30:16|
|   a|1624627800|  1.0|2021-06-25 15:30:00|
|   a|1624635000| null|2021-06-25 17:30:00|
|   a|1624642199| null|2021-06-25 19:29:59|
 ---- ---------- ----- ------------------- 
 

После прямого заполнения я хочу, чтобы мои выходные данные выглядели так

  ---- ---------- ----- ------------------- 
|uuid|        ts|value|   measurement_date|
 ---- ---------- ----- ------------------- 
|   a|1624450756|  5.0|2021-06-23 14:19:16|
|   a|1624451416|  5.0|2021-06-23 14:30:16| // Filled with value from previous row
|   a|1624457957|  5.0|2021-06-23 16:19:17| // Filled with value from previous row
|   a|1624537816| null|2021-06-24 14:30:16| // Not filled since prev. row is not less than two hours
|   a|1624627800|  1.0|2021-06-25 15:30:00|
|   a|1624635000| null|2021-06-25 17:30:00| // Not filled since prev. row is not less than two hours
|   a|1624642199| null|2021-06-25 19:29:59| // Not filled since previous row is null
 ---- ---------- ----- ------------------- 
 

Я попытался выполнить переадресацию, а затем выбрать значение, заполненное переадресацией, если разница между временными метками составляет менее двух часов, но это не сработает, так как оно не заполнится предыдущим нулем:

 df
  .withColumn("value_filled", last("value",true).over(Window.partitionBy("uuid").orderBy("ts")))
  .withColumn("prev_ts", lag("ts",1).over(Window.partitionBy("uuid").orderBy("ts")))
  .withColumn("value", when(col("ts")-col("prev_ts") < lit(7200), col("value_filled")).otherwise(col("value")))
  .show
 ---- ---------- ----- ------------------- ------------ ---------- 
|uuid|        ts|value|   measurement_date|value_filled|   prev_ts|
 ---- ---------- ----- ------------------- ------------ ---------- 
|   a|1624450756|  5.0|2021-06-23 14:19:16|         5.0|      null|
|   a|1624451416|  5.0|2021-06-23 14:30:16|         5.0|1624450756|
|   a|1624457957|  5.0|2021-06-23 16:19:17|         5.0|1624451416|
|   a|1624537816| null|2021-06-24 14:30:16|         5.0|1624457957|
|   a|1624627800|  1.0|2021-06-25 15:30:00|         1.0|1624537816|
|   a|1624635000| null|2021-06-25 17:30:00|         1.0|1624627800|
|   a|1624642199|  1.0|2021-06-25 19:29:59|         1.0|1624635000| // Filled wrongly since prev. row is less than two hours ago
 ---- ---------- ----- ------------------- ------------ ---------- 
 

As you can see in the output above, I falsely fill the last row because it is less than two hours after the previous one, even though the previous row contains null.

Now my question is how could achieve I this forward-fill based on the condition?

I’ve also tried using rangeBetween , but that will only fill the next row and not the ones after that:

 df
  .withColumn("value_filled", last("value",true).over(Window.partitionBy("uuid").orderBy("ts").rangeBetween(-7199,Window.currentRow)))
  .show
 ---- ---------- ----- ------------------- ------------ 
|uuid|        ts|value|   measurement_date|value_filled|
 ---- ---------- ----- ------------------- ------------ 
|   a|1624450756|  5.0|2021-06-23 14:19:16|         5.0|
|   a|1624451416| null|2021-06-23 14:30:16|         5.0|
|   a|1624457957| null|2021-06-23 16:19:17|        null| // This one should be filled
|   a|1624537816| null|2021-06-24 14:30:16|        null|
|   a|1624627800|  1.0|2021-06-25 15:30:00|         1.0|
|   a|1624635000| null|2021-06-25 17:30:00|        null|
|   a|1624642199| null|2021-06-25 19:29:59|        null|
 ---- ---------- ----- ------------------- ------------