SparkSQL, если значение null, принимает предыдущее значение

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Итак, у меня есть этот фрейм данных

  --- ------------- ----- 
| id|    timestamp|  num|
 --- ------------- ----- 
| 10|1546300799000| 37.5|
| 10|1546300800000| null|
| 10|1546300801000| null|
| 10|1546300802000|37.51|
| 20|1546300804000| null|
| 10|1546300806000| 37.5|
| 10|1546300807000| null|
 --- ------------- ----- 
  

Чего я пытаюсь добиться, так это того, что num должно быть обновлено само значение, если оно присутствует, или, если null, «последнее» значение, полученное из предыдущей строки (упорядоченное по метке времени и сгруппированное по идентификатору)

Итак, это должно быть выведено

  --- ------------- ----- 
| id|    timestamp|  num|
 --- ------------- ----- 
| 10|1546300799000| 37.5|
| 10|1546300800000| 37.5|
| 10|1546300801000| 37.5|
| 10|1546300802000|37.51|
| 20|1546300804000| null|
| 10|1546300806000| 37.5|
| 10|1546300807000| 37.5|
 --- ------------- ----- 
  

Я придумал это решение

 w = Window.partitionBy('id').orderBy('timestamp')
final = joined.withColumn('num2', when(col('num').isNull(), lag(col('num')).over(w)).otherwise(col('num')))
  

но это результат, который я получаю

  --- ------------- ----- ----- 
| id|    timestamp|  num| num2|
 --- ------------- ----- ----- 
| 10|1546300799000| 37.5| 37.5|
| 10|1546300800000| null| 37.5|
| 10|1546300801000| null| null|
| 10|1546300802000|37.51|37.51|
| 20|1546300804000| null| null|
| 10|1546300806000| 37.5| 37.5|
| 10|1546300807000| null| 37.5|
 --- ------------- ----- ----- 
  

как вы можете видеть, значение получает предыдущее значение, если оно равно нулю, но если вы посмотрите на третью строку, я получаю значение null, и я предполагаю, что оно получает значение 2-й строки, но когда оно все еще не обновляется (так что все еще null из исходного фрейма данных).

Я немного растерялся, как мне поступить. Любая помощь?

Ответ №1:

Вы хотите перенаправить заполнение меры, которая, к сожалению, не является чем-то встроенным в Pyspark, как в Pandas. Но есть обходной путь.

 from pyspark.sql import functions as F
from pyspark.sql.window import Window

 window = Window.partitionBy('id')
           .orderBy('timestamp')
           .rowsBetween(Window.unboundedPreceding, Window.currentRow)

 final = joined.
               withColumn('numFilled', F.last('num',ignorenulls = True).over(window)
  

Итак, что это делает, так это то, что оно создает ваше окно на основе ключа раздела и столбца порядка. Он также сообщает окну просмотреть предыдущие строки и перейти к текущей строке. Наконец, в каждой строке вы возвращаете последнее значение, которое не равно null (которое запоминается в соответствии с вашим окном, включая вашу текущую строку)

Комментарии:

1. Просто чтобы уточнить, это правильно, но прежде чем утвердить его, не могли бы вы изменить -10000 на Window.unboundedPreceding, чтобы он работал независимо от размера фрейма данных?

2. @Tizianoreica, имеет смысл. Обновлен ответ.