PySpark заполняет некоторые конкретные пропущенные значения

#pyspark #apache-spark-sql #pyspark-dataframes

#PySpark #apache-spark-sql #PySpark-фреймы данных

Вопрос:

Мой фрейм данных spark;

 Client  Date        Due_Day
A      2017-01-01   Null
A      2017-02-01   Null
A      2017-03-01   Null
A      2017-04-01   Null
A      2017-05-01   Null
A      2017-06-01   35
A      2017-07-01   Null
A      2017-08-01   Null
A      2017-09-01   Null
A      2017-10-01   Null
A      2017-11-01   Null
A      2017-12-01   Null
B      2017-01-01   Null
B      2017-02-01   Null
B      2017-03-01   Null
B      2017-04-01   Null
B      2017-05-01   Null
B      2017-06-01   Null
B      2017-07-01   Null
B      2017-08-01   Null
B      2017-09-01   Null
B      2017-10-01   78
B      2017-11-01   Null
B      2017-12-01   Null
  

Для того же клиента во фрейме данных есть один ненулевой Due_Day.

Желаемый результат;

 Client  Date    Due_Day    Result
A   2017-01-01  Null       Null
A   2017-02-01  Null       Null
A   2017-03-01  Null       Null
A   2017-04-01  Null       Null
A   2017-05-01  Null       Null         -> This month should be remain as Null
A   2017-06-01  35         35
A   2017-07-01  Null       Paid         -> After one month should be 'Paid'
A   2017-08-01  Null       OK           -> Next Months should be 'OK'
A   2017-09-01  Null       OK
A   2017-10-01  Null       OK
A   2017-11-01  Null       OK
A   2017-12-01  Null       OK
B   2017-01-01  Null       Null
B   2017-02-01  Null       Null
B   2017-03-01  Null       Null
B   2017-04-01  Null       Null
B   2017-05-01  Null       Null
B   2017-06-01  Null       Null
B   2017-07-01  Null       Null
B   2017-08-01  Null       Null
B   2017-09-01  Null       Null
B   2017-10-01  78         78
B   2017-11-01  Null       Paid         -> After one month
B   2017-12-01  Null       OK
  

Для клиента он должен быть помечен как «оплаченный» за месяц после ненулевого значения Due_Day. И должен быть помечен как «OK» в течение следующих месяцев до конца года. Предыдущие месяцы снова должны оставаться нулевыми.

Не могли бы вы, пожалуйста, помочь мне с кодом pyspark по этому поводу?

Комментарии:

1. Как вы рассчитываете 31-60 дней или 61-90 дней?

2. это просто категориальное значение для значения Due_Day. Пожалуйста, проигнорируйте это.

3. Не могли бы вы, пожалуйста, проверить разделяемое решение .. буду очень признателен, если вы поможете принять и одобрить

Ответ №1:

Ниже может быть рабочее решение для вас, я попытаюсь объяснить логику решения —

  1. В тот момент, когда у вас есть какое-то значение в due_day столбце, мы делаем a forward fill , чтобы заполнить следующие строки тем же значением для будущих вычислений.
  2. Как только forward_fill столбец настроен, следующий становится простым, и мы можем написать базовую логику.

Создайте DF здесь

 df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False) 
 ---- ---------- ---- 
|col1|col2      |col3|
 ---- ---------- ---- 
|A   |2017-01-01|null|
|A   |2017-02-01|null|
|A   |2017-03-01|35  |
|A   |2017-04-01|null|
|A   |2017-05-01|null|
|B   |2017-01-01|null|
|B   |2017-02-01|78  |
|B   |2017-03-01|null|
|B   |2017-04-01|null|
|B   |2017-05-01|null|
 ---- ---------- ---- 
  

Переадресация заполнения здесь, чтобы заполнить следующие строки теми же значениями

 w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
 ---- ---------- ---- ---------- 
|col1|      col2|col3|filled_col|
 ---- ---------- ---- ---------- 
|   B|2017-01-01|null|      null|
|   B|2017-02-01|  78|        78|
|   B|2017-03-01|null|        78|
|   B|2017-04-01|null|        78|
|   B|2017-05-01|null|        78|
|   A|2017-01-01|null|      null|
|   A|2017-02-01|null|      null|
|   A|2017-03-01|  35|        35|
|   A|2017-04-01|null|        35|
|   A|2017-05-01|null|        35|
 ---- ---------- ---- ---------- 
  

Мы назначим a row_number в каждой строке, поскольку мы уже знаем, что если номер строки равен 2, то он будет оплачен и останется в порядке

     w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) amp; (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) amp; (F.col("rnk") > 2)), F.lit("OK")))
df.show()
 ---- ---------- ---- ---------- --- ------ 
|col1|      col2|col3|filled_col|rnk|Result|
 ---- ---------- ---- ---------- --- ------ 
|   B|2017-01-01|null|      null|  1|  null|
|   B|2017-02-01|  78|        78|  1|  null|
|   B|2017-03-01|null|        78|  2|  Paid|
|   B|2017-04-01|null|        78|  3|    OK|
|   B|2017-05-01|null|        78|  4|    OK|
|   A|2017-01-01|null|      null|  1|  null|
|   A|2017-02-01|null|      null|  2|  null|
|   A|2017-03-01|  35|        35|  1|  null|
|   A|2017-04-01|null|        35|  2|  Paid|
|   A|2017-05-01|null|        35|  3|    OK|
 ---- ---------- ---- ---------- --- ------ 
  

Выберите последний столбец по вашему выбору

 df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
 ---- ---------- ---- ------ 
|col1|      col2|col3|Result|
 ---- ---------- ---- ------ 
|   A|2017-01-01|null|  null|
|   A|2017-02-01|null|  null|
|   A|2017-03-01|  35|    35|
|   A|2017-04-01|null|  Paid|
|   A|2017-05-01|null|    OK|
|   B|2017-01-01|null|  null|
|   B|2017-02-01|  78|    78|
|   B|2017-03-01|null|  Paid|
|   B|2017-04-01|null|    OK|
|   B|2017-05-01|null|    OK|
 ---- ---------- ---- ------