Функция вывода окна Pyspark со значением смещения от значения другого столбца

#apache-spark #pyspark #apache-spark-sql #rolling-computation

#apache-spark #pyspark #apache-spark-sql #rolling-вычисление

Вопрос:

Я пытаюсь получить новый столбец, используя функцию вывода окна, но мое значение смещения для функции вывода варьируется в зависимости от значения столбца, вот мои примерные данные

 inputdata = (("James", "Sales", 3000), 
    ("James", "Sales", 4600),  
    ("James", "Sales", 4100),   
    ("James", "Finance", 3000),  
    ("James", "Sales", 3000),    
    ("James", "Finance", 3300),  
    ("James", "Finance", 3900),    
    ("Kumar", "Marketing", 3000), 
    ("Kumar", "Marketing", 2000),
    ("James", "Sales", 4100) 
  )
  
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = inputdata, schema = columns)
 

ввод

  ------------- ---------- ------ 
|employee_name|department|salary|
 ------------- ---------- ------ 
|James        |Sales     |3000  |
|James        |Sales     |3000  |
|James        |Sales     |4100  |
|James        |Sales     |4100  |
|James        |Sales     |4600  |
|James        |Finance   |3000  |
|James        |Finance   |3300  |
|James        |Finance   |3900  |
|Kumar        |Marketing |2000  |
|Kumar        |Marketing |3000  |
 ------------- ---------- ------ 
 

ожидаемый результат:

Здесь у меня есть столбец с именем ожидаемое значение

оно должно быть получено из столбца зарплаты, а значение должно быть следующим немедленным большим значением

если следующее значение столбца такое же, то оно должно учитывать следующее значение, пока не найдет другое значение,

в приведенном ниже примере ожидаемое значение для первых двух столбцов равно 4100, что является следующим большим значением, здесь 9999 — значение по умолчанию

введите описание изображения здесь

Я использовал функцию вывода окна, но проблема в том, что смещение должно быть постоянным, я имею в виду, что вывод может продолжаться только до фиксированного количества записей, есть ли способ решить это?

это то, что я пробовал:

создан столбец смещения, который, функция вывода должна идти далеко

 windowSpec  = Window.partitionBy("employee_name","department","salary").orderBy('employee_name','department','salary')
dfk = df.withColumn("row_number",row_number().over(windowSpec))
dfk = dfk.withColumn("max_row_number", max(col("row_number") 1).over(windowSpec))
dfk = dfk.orderBy("employee_name","department","salary").withColumn('offset', dfk['max_row_number']-dfk['row_number'])

 ------------- ---------- ------ ---------- -------------- ------ 
|employee_name|department|salary|row_number|max_row_number|offset|
 ------------- ---------- ------ ---------- -------------- ------ 
|        James|   Finance|  3000|         1|             2|     1|
|        James|   Finance|  3300|         2|             3|     1|
|        James|   Finance|  3900|         3|             4|     1|
|        James|     Sales|  3000|         1|             3|     2|
|        James|     Sales|  3000|         2|             3|     1|
|        James|     Sales|  4100|         3|             5|     2|
|        James|     Sales|  4100|         4|             5|     1|
|        James|     Sales|  4600|         5|             6|     1|
|        Kumar| Marketing|  2000|         1|             2|     1|
|        Kumar| Marketing|  3000|         2|             3|     1|
 ------------- ---------- ------ ---------- -------------- ------ 
 

но я не могу передать столбец смещения как смещение в функцию вывода или что-то в этом роде

 windowSpec_2  = Window.partitionBy("employee_name","department").orderBy('employee_name','department','salary','row_number')
dfk.withColumn("*** expected value ****",lead('salary', **dfk['offset']**, 9999).over(windowSpec_2)).show()
 

Я пробовал использовать express / eval, но я все еще вижу

итерация вызывает TypeError(«Столбец не повторяется») Ошибка типа: столбец не повторяется

есть ли какой — либо способ достичь вышеуказанных ожидаемых результатов ?

заранее спасибо

Ответ №1:

Ваш вариант использования немного сложный, поэтому я думаю lead , что это может быть неуместно. Возможно, вам лучше использовать rangeBetweeen окно и получить min значение в этом окне:

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

df2 = df.withColumn(
    'expected_value',
    F.coalesce(
        F.min('salary').over(
            Window.partitionBy(
                "employee_name","department"
            ).orderBy(
                'salary'
            ).rangeBetween(
                1, Window.unboundedFollowing
            )
        ),
        F.lit(9999)
    )
)

df2.show()
 ------------- ---------- ------ -------------- 
|employee_name|department|salary|expected_value|
 ------------- ---------- ------ -------------- 
|        James|     Sales|  3000|          4100|
|        James|     Sales|  3000|          4100|
|        James|     Sales|  4100|          4600|
|        James|     Sales|  4100|          4600|
|        James|     Sales|  4600|          9999|
|        Kumar| Marketing|  2000|          3000|
|        Kumar| Marketing|  3000|          9999|
|        James|   Finance|  3000|          3300|
|        James|   Finance|  3300|          3900|
|        James|   Finance|  3900|          9999|
 ------------- ---------- ------ --------------