#apache-spark #pyspark
#apache-spark #pyspark
Вопрос:
Я пытаюсь создать флаг, который показывает, какие id1
значения имеют повторяющееся сопоставление с id2
произвольным окном n
( n = 2
в примере ниже). Например, учитывая этот фрейм данных:
df = spark.createDataFrame(
[("2010-03-10", "A", "X"),
("2010-03-10", "A", "Y"),
("2010-03-10", "B", "Z"),
("2010-04-10", "A", "X"),
("2010-04-10", "A", "Y"),
("2010-04-10", "B", "Z"),
("2010-05-10", "A", "X"),
("2010-05-10", "A", "Y"),
("2010-05-10", "B", "Z"),
("2010-06-10", "A", "X"),
("2010-06-10", "B", "Z"),
("2010-07-10", "A", "X"),
("2010-07-10", "B", "Z")],
("date", "id1", "id2")
)
df.show()
---------- --- ---
| date|id1|id2|
---------- --- ---
|2010-03-10| A| X|
|2010-03-10| A| Y|
|2010-03-10| B| Z|
|2010-04-10| A| X|
|2010-04-10| A| Y|
|2010-04-10| B| Z|
|2010-05-10| A| X|
|2010-05-10| A| Y|
|2010-05-10| B| Z|
|2010-06-10| A| X|
|2010-06-10| B| Z|
|2010-07-10| A| X|
|2010-07-10| B| Z|
---------- --- ---
С ожидаемым результатом:
---------- --- --- --------
| date|id1|id2|dup_flag|
---------- --- --- --------
|2010-03-10| A| X| 1|
|2010-03-10| A| Y| 1|
|2010-03-10| B| Z| 0|
|2010-04-10| A| X| 1|
|2010-04-10| A| Y| 1|
|2010-04-10| B| Z| 0|
|2010-05-10| A| X| 1|
|2010-05-10| A| Y| 1|
|2010-05-10| B| Z| 0|
|2010-06-10| A| X| 1|
|2010-06-10| B| Z| 0|
|2010-07-10| A| X| 0|
|2010-07-10| B| Z| 0|
---------- --- --- --------
Где dup_flag
— флаг, указывающий, содержит ли id1 дубликат сопоставления id2
где-то за последние 2 месяца.
То есть, для id1
B
, он никогда не сопоставляется ни с каким другим id2
, чем Z
, следовательно, он никогда не помечается как дубликат. Потому id1
A
что у нас есть повторяющиеся сопоставления. В этом примере я установил размер окна 2
равным , что означает, что если A
сопоставляется с несколькими id2
в пределах любой даты окна, которая идет не более одного месяца назад (т. Е. В окне должен быть включен текущий месяц, а также прошлый месяц), он должен получить флаг. Следовательно, A
получает этот флаг до тех пор, пока 2010-07-10
, как в этот период, у нас не появится окно, состоящее из обоих 2010-06-10
и 2010-07-10
, в котором A
не сопоставляется с несколькими id2
(оно сопоставляется только id2
X
в этом окне).
Комментарии:
1. Для строки
|2010-07-10| A| X| 0|
не должен ли флаг быть 1, потому что в 2020-05-10 есть повторяющиеся записи?2. Флаг должен быть равен 0, потому что для размера окна
2
я бы хотел посмотреть только на2010-06-10
дату и2010-07-10
когда2010-07-10
.2010-05-10
в этом случае будет игнорироваться.3. но вы сказали
within any date of a window that goes at most two months back
😉4. Вы правы, ошибка с моей стороны. Исправлено в тексте сейчас!
Ответ №1:
Несколько хакерский способ решить эту проблему. По сути, вы создаете окно, основанное на диапазоне, а не на строке, и выполняете a count distinct
над этим окном. Каким-то образом Spark не поддерживает последнее, поэтому мне нужно было обойти его, используя size(collect_set)
, по сути, то же count distinct
самое, что и . Также Spark не поддерживает окна на основе диапазона, упорядоченные по датам, поэтому мне нужно было привести столбец к целочисленному типу и использовать магическое число 2678400, которое равно количеству секунд в 31 день.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
### initialize df as in the question
### df = ...
df = df.withColumn('date', F.col('date').cast('timestamp').cast('bigint'))
flag = (F.size(F.collect_set(F.col('id2')).over(Window.partitionBy('id1').orderBy('date').rangeBetween(-2678400, 0))) > 1).cast('int')
df = df.withColumn('flag', flag)
df = df.withColumn('date', F.col('date').cast('timestamp').cast('date'))
Идея заключается в чем-то вроде
COUNT DISTINCT id2 OVER (PARTITION BY id1 ORDER BY date RANGE BETWEEN INTERVAL '1' MONTH PRECEDING AND CURRENT ROW)
Комментарии:
1. Спасибо за ответ. Я вижу, что решение не помечается
2010-06-10
как дублирующая запись дляA
, хотя в нем есть дублирующее отображение2010-05-10
. Возможно, это связано с тем, что не каждый месяц имеет ровно 30 дней.2. Да, тогда, возможно, мне следует использовать 31 вместо
3. Я считаю, что это все равно вызовет проблемы, учитывая, что не в каждом месяце ровно 31 день. Похоже, что эта проблема не так тривиальна, как я предполагал, большое вам спасибо за помощь, несмотря ни на что.
4. Ах, я должен был извлечь месяц и преобразовать в целое число, если вас интересует только это!
5. Но тогда возникнет проблема года, например, «2010-01» против «2009-12»