#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Итак, у меня есть этот фрейм данных
--- ------------- -----
| id| timestamp| num|
--- ------------- -----
| 10|1546300799000| 37.5|
| 10|1546300800000| null|
| 10|1546300801000| null|
| 10|1546300802000|37.51|
| 20|1546300804000| null|
| 10|1546300806000| 37.5|
| 10|1546300807000| null|
--- ------------- -----
Чего я пытаюсь добиться, так это того, что num
должно быть обновлено само значение, если оно присутствует, или, если null, «последнее» значение, полученное из предыдущей строки (упорядоченное по метке времени и сгруппированное по идентификатору)
Итак, это должно быть выведено
--- ------------- -----
| id| timestamp| num|
--- ------------- -----
| 10|1546300799000| 37.5|
| 10|1546300800000| 37.5|
| 10|1546300801000| 37.5|
| 10|1546300802000|37.51|
| 20|1546300804000| null|
| 10|1546300806000| 37.5|
| 10|1546300807000| 37.5|
--- ------------- -----
Я придумал это решение
w = Window.partitionBy('id').orderBy('timestamp')
final = joined.withColumn('num2', when(col('num').isNull(), lag(col('num')).over(w)).otherwise(col('num')))
но это результат, который я получаю
--- ------------- ----- -----
| id| timestamp| num| num2|
--- ------------- ----- -----
| 10|1546300799000| 37.5| 37.5|
| 10|1546300800000| null| 37.5|
| 10|1546300801000| null| null|
| 10|1546300802000|37.51|37.51|
| 20|1546300804000| null| null|
| 10|1546300806000| 37.5| 37.5|
| 10|1546300807000| null| 37.5|
--- ------------- ----- -----
как вы можете видеть, значение получает предыдущее значение, если оно равно нулю, но если вы посмотрите на третью строку, я получаю значение null, и я предполагаю, что оно получает значение 2-й строки, но когда оно все еще не обновляется (так что все еще null из исходного фрейма данных).
Я немного растерялся, как мне поступить. Любая помощь?
Ответ №1:
Вы хотите перенаправить заполнение меры, которая, к сожалению, не является чем-то встроенным в Pyspark, как в Pandas. Но есть обходной путь.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('id')
.orderBy('timestamp')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
final = joined.
withColumn('numFilled', F.last('num',ignorenulls = True).over(window)
Итак, что это делает, так это то, что оно создает ваше окно на основе ключа раздела и столбца порядка. Он также сообщает окну просмотреть предыдущие строки и перейти к текущей строке. Наконец, в каждой строке вы возвращаете последнее значение, которое не равно null (которое запоминается в соответствии с вашим окном, включая вашу текущую строку)
Комментарии:
1. Просто чтобы уточнить, это правильно, но прежде чем утвердить его, не могли бы вы изменить -10000 на Window.unboundedPreceding, чтобы он работал независимо от размера фрейма данных?
2. @Tizianoreica, имеет смысл. Обновлен ответ.