#dataframe #apache-spark #apache-spark-sql
Вопрос:
У меня есть серия измерений с некоторыми нулевыми значениями, которые я хотел бы заполнить вперед, только если предыдущая строка была меньше двух часов назад (на основе столбца с меткой времени).
- если в строке есть значение, а следующие строки содержат значение null, но между ними менее двух часов, я хочу заполнить до следующего значения/последней строки.
- если между двумя строками более двух часов, следующие строки не должны заполняться вперед, пока снова не появится строка со значением.
Для тестирования я создаю свои входные данные следующим образом:
val df = Seq(
("a", 1624450756, Some(5.0)),
("a", 1624451416, None),
("a", 1624457957, None),
("a", 1624537816, None),
("a", 1624627800, Some(1.0)),
("a", 1624635000, None),
("a", 1624642199, None)).toDF("uuid", "ts", "value").withColumn("measurement_date", to_timestamp(from_unixtime(col("ts"))))
df.show
---- ---------- ----- -------------------
|uuid| ts|value| measurement_date|
---- ---------- ----- -------------------
| a|1624450756| 5.0|2021-06-23 14:19:16|
| a|1624451416| null|2021-06-23 14:30:16|
| a|1624457957| null|2021-06-23 16:19:17|
| a|1624537816| null|2021-06-24 14:30:16|
| a|1624627800| 1.0|2021-06-25 15:30:00|
| a|1624635000| null|2021-06-25 17:30:00|
| a|1624642199| null|2021-06-25 19:29:59|
---- ---------- ----- -------------------
После прямого заполнения я хочу, чтобы мои выходные данные выглядели так
---- ---------- ----- -------------------
|uuid| ts|value| measurement_date|
---- ---------- ----- -------------------
| a|1624450756| 5.0|2021-06-23 14:19:16|
| a|1624451416| 5.0|2021-06-23 14:30:16| // Filled with value from previous row
| a|1624457957| 5.0|2021-06-23 16:19:17| // Filled with value from previous row
| a|1624537816| null|2021-06-24 14:30:16| // Not filled since prev. row is not less than two hours
| a|1624627800| 1.0|2021-06-25 15:30:00|
| a|1624635000| null|2021-06-25 17:30:00| // Not filled since prev. row is not less than two hours
| a|1624642199| null|2021-06-25 19:29:59| // Not filled since previous row is null
---- ---------- ----- -------------------
Я попытался выполнить переадресацию, а затем выбрать значение, заполненное переадресацией, если разница между временными метками составляет менее двух часов, но это не сработает, так как оно не заполнится предыдущим нулем:
df
.withColumn("value_filled", last("value",true).over(Window.partitionBy("uuid").orderBy("ts")))
.withColumn("prev_ts", lag("ts",1).over(Window.partitionBy("uuid").orderBy("ts")))
.withColumn("value", when(col("ts")-col("prev_ts") < lit(7200), col("value_filled")).otherwise(col("value")))
.show
---- ---------- ----- ------------------- ------------ ----------
|uuid| ts|value| measurement_date|value_filled| prev_ts|
---- ---------- ----- ------------------- ------------ ----------
| a|1624450756| 5.0|2021-06-23 14:19:16| 5.0| null|
| a|1624451416| 5.0|2021-06-23 14:30:16| 5.0|1624450756|
| a|1624457957| 5.0|2021-06-23 16:19:17| 5.0|1624451416|
| a|1624537816| null|2021-06-24 14:30:16| 5.0|1624457957|
| a|1624627800| 1.0|2021-06-25 15:30:00| 1.0|1624537816|
| a|1624635000| null|2021-06-25 17:30:00| 1.0|1624627800|
| a|1624642199| 1.0|2021-06-25 19:29:59| 1.0|1624635000| // Filled wrongly since prev. row is less than two hours ago
---- ---------- ----- ------------------- ------------ ----------
As you can see in the output above, I falsely fill the last row because it is less than two hours after the previous one, even though the previous row contains null.
Now my question is how could achieve I this forward-fill based on the condition?
I’ve also tried using rangeBetween
, but that will only fill the next row and not the ones after that:
df
.withColumn("value_filled", last("value",true).over(Window.partitionBy("uuid").orderBy("ts").rangeBetween(-7199,Window.currentRow)))
.show
---- ---------- ----- ------------------- ------------
|uuid| ts|value| measurement_date|value_filled|
---- ---------- ----- ------------------- ------------
| a|1624450756| 5.0|2021-06-23 14:19:16| 5.0|
| a|1624451416| null|2021-06-23 14:30:16| 5.0|
| a|1624457957| null|2021-06-23 16:19:17| null| // This one should be filled
| a|1624537816| null|2021-06-24 14:30:16| null|
| a|1624627800| 1.0|2021-06-25 15:30:00| 1.0|
| a|1624635000| null|2021-06-25 17:30:00| null|
| a|1624642199| null|2021-06-25 19:29:59| null|
---- ---------- ----- ------------------- ------------