Как применить функции groupBy и aggregate к определенному окну в фрейме данных PySpark?

#python #apache-spark #pyspark #apache-spark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я хотел бы применить a groupBy и последующую agg функцию к фрейму данных PySpark, но только к определенному окну. Это лучше всего иллюстрируется примером. Предположим, что у меня есть набор данных с именем df :

 df.show()

     ----- ---------- ---------- ------- 
    |   ID| Timestamp| Condition|  Value|
     ----- ---------- ---------- ------- 
    |   z1|         1|         0|     50|
|-------------------------------------------|
|   |   z1|         2|         0|     51|   |
|   |   z1|         3|         0|     52|   |
|   |   z1|         4|         0|     51|   |
|   |   z1|         5|         1|     51|   |
|   |   z1|         6|         0|     49|   |
|   |   z1|         7|         0|     44|   |
|   |   z1|         8|         0|     46|   |
|-------------------------------------------|
    |   z1|         9|         0|     48|
    |   z1|        10|         0|     42|
  ----- ---------- ---------- ------- 
 

В частности, то, что я хотел бы сделать, это применить своего рода окно из — 3 строк к строке, где столбец Condition == 1 (т. Е. В данном случае строка 5). В этом окне, как показано в приведенном выше фрейме данных, я хотел бы найти минимальное значение столбца Value и соответствующее значение столбца Timestamp , получив таким образом:

  ---------- ---------- 
| Min_value| Timestamp|
 ---------- ---------- 
|        44|         7|
 ---------- ---------- 
 

Кто-нибудь знает, как это можно решить?

Заранее большое спасибо

Мариоанзас

Ответ №1:

Вы можете использовать окно, которое занимает промежуток между 3 предыдущими и 3 следующими строками, получить минимум и отфильтровать условие:

 from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'min',
    F.min(
        F.struct('Value', 'Timestamp')
    ).over(Window.partitionBy('ID').orderBy('Timestamp').rowsBetween(-3,3))
).filter('Condition = 1').select('min.*')

df2.show()
 ----- --------- 
|Value|Timestamp|
 ----- --------- 
|   44|        7|
 ----- --------- 
 

Комментарии:

1. Привет @mck! Большое спасибо за ваше предложение. Однако это определенно сработало бы для наборов данных, в которых содержится только одна строка Condition == 1 . Однако я не думаю, что это сработает для наборов данных, в которых у меня есть две или более строк Condition == 1 . Вы согласны?

2. @Marioanzas да, я понял, что это плохое решение. Я переписал его, и, надеюсь, теперь он будет выполнять работу намного лучше