#pyspark #apache-spark-sql #pyspark-dataframes
#PySpark #apache-spark-sql #PySpark-фреймы данных
Вопрос:
Мой фрейм данных spark;
Client Date Due_Day
A 2017-01-01 Null
A 2017-02-01 Null
A 2017-03-01 Null
A 2017-04-01 Null
A 2017-05-01 Null
A 2017-06-01 35
A 2017-07-01 Null
A 2017-08-01 Null
A 2017-09-01 Null
A 2017-10-01 Null
A 2017-11-01 Null
A 2017-12-01 Null
B 2017-01-01 Null
B 2017-02-01 Null
B 2017-03-01 Null
B 2017-04-01 Null
B 2017-05-01 Null
B 2017-06-01 Null
B 2017-07-01 Null
B 2017-08-01 Null
B 2017-09-01 Null
B 2017-10-01 78
B 2017-11-01 Null
B 2017-12-01 Null
Для того же клиента во фрейме данных есть один ненулевой Due_Day.
Желаемый результат;
Client Date Due_Day Result
A 2017-01-01 Null Null
A 2017-02-01 Null Null
A 2017-03-01 Null Null
A 2017-04-01 Null Null
A 2017-05-01 Null Null -> This month should be remain as Null
A 2017-06-01 35 35
A 2017-07-01 Null Paid -> After one month should be 'Paid'
A 2017-08-01 Null OK -> Next Months should be 'OK'
A 2017-09-01 Null OK
A 2017-10-01 Null OK
A 2017-11-01 Null OK
A 2017-12-01 Null OK
B 2017-01-01 Null Null
B 2017-02-01 Null Null
B 2017-03-01 Null Null
B 2017-04-01 Null Null
B 2017-05-01 Null Null
B 2017-06-01 Null Null
B 2017-07-01 Null Null
B 2017-08-01 Null Null
B 2017-09-01 Null Null
B 2017-10-01 78 78
B 2017-11-01 Null Paid -> After one month
B 2017-12-01 Null OK
Для клиента он должен быть помечен как «оплаченный» за месяц после ненулевого значения Due_Day. И должен быть помечен как «OK» в течение следующих месяцев до конца года. Предыдущие месяцы снова должны оставаться нулевыми.
Не могли бы вы, пожалуйста, помочь мне с кодом pyspark по этому поводу?
Комментарии:
1. Как вы рассчитываете 31-60 дней или 61-90 дней?
2. это просто категориальное значение для значения Due_Day. Пожалуйста, проигнорируйте это.
3. Не могли бы вы, пожалуйста, проверить разделяемое решение .. буду очень признателен, если вы поможете принять и одобрить
Ответ №1:
Ниже может быть рабочее решение для вас, я попытаюсь объяснить логику решения —
- В тот момент, когда у вас есть какое-то значение в
due_day
столбце, мы делаем aforward fill
, чтобы заполнить следующие строки тем же значением для будущих вычислений. - Как только
forward_fill
столбец настроен, следующий становится простым, и мы можем написать базовую логику.
Создайте DF здесь
df = spark.createDataFrame([("A","2017-01-01", None),("A","2017-02-01", None),("A","2017-03-01", 35),("A","2017-04-01",None),("A","2017-05-01", None),("B","2017-01-01", None),("B","2017-02-01", 78),("B","2017-03-01", None),("B","2017-04-01", None),("B","2017-05-01", None)],["col1", "col2", "col3"])
df.show(truncate=False)
---- ---------- ----
|col1|col2 |col3|
---- ---------- ----
|A |2017-01-01|null|
|A |2017-02-01|null|
|A |2017-03-01|35 |
|A |2017-04-01|null|
|A |2017-05-01|null|
|B |2017-01-01|null|
|B |2017-02-01|78 |
|B |2017-03-01|null|
|B |2017-04-01|null|
|B |2017-05-01|null|
---- ---------- ----
Переадресация заполнения здесь, чтобы заполнить следующие строки теми же значениями
w = W.partitionBy("col1").orderBy("col2")
df = df.withColumn("filled_col", F.last("col3", ignorenulls=True).over(w))
df.show()
---- ---------- ---- ----------
|col1| col2|col3|filled_col|
---- ---------- ---- ----------
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| 78|
| B|2017-04-01|null| 78|
| B|2017-05-01|null| 78|
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| 35|
| A|2017-05-01|null| 35|
---- ---------- ---- ----------
Мы назначим a row_number
в каждой строке, поскольку мы уже знаем, что если номер строки равен 2, то он будет оплачен и останется в порядке
w2 = W.partitionBy("col1", "filled_col").orderBy("col2")
df = df.withColumn("rnk", F.row_number().over(w2))
df = df.withColumn("Result", F.when(((F.col("filled_col").isNotNull()) amp; (F.col("rnk") ==F.lit("2"))), F.lit("Paid")).when(((F.col("filled_col").isNotNull()) amp; (F.col("rnk") > 2)), F.lit("OK")))
df.show()
---- ---------- ---- ---------- --- ------
|col1| col2|col3|filled_col|rnk|Result|
---- ---------- ---- ---------- --- ------
| B|2017-01-01|null| null| 1| null|
| B|2017-02-01| 78| 78| 1| null|
| B|2017-03-01|null| 78| 2| Paid|
| B|2017-04-01|null| 78| 3| OK|
| B|2017-05-01|null| 78| 4| OK|
| A|2017-01-01|null| null| 1| null|
| A|2017-02-01|null| null| 2| null|
| A|2017-03-01| 35| 35| 1| null|
| A|2017-04-01|null| 35| 2| Paid|
| A|2017-05-01|null| 35| 3| OK|
---- ---------- ---- ---------- --- ------
Выберите последний столбец по вашему выбору
df = df.withColumn("Result", F.coalesce("Result" , "col3"))
df.select("col1", "col2", "col3", "Result").orderBy("col1").show()
---- ---------- ---- ------
|col1| col2|col3|Result|
---- ---------- ---- ------
| A|2017-01-01|null| null|
| A|2017-02-01|null| null|
| A|2017-03-01| 35| 35|
| A|2017-04-01|null| Paid|
| A|2017-05-01|null| OK|
| B|2017-01-01|null| null|
| B|2017-02-01| 78| 78|
| B|2017-03-01|null| Paid|
| B|2017-04-01|null| OK|
| B|2017-05-01|null| OK|
---- ---------- ---- ------