Создать флаг, если идентификатор сопоставляется с несколькими другими идентификаторами в окне

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

Я пытаюсь создать флаг, который показывает, какие id1 значения имеют повторяющееся сопоставление с id2 произвольным окном n ( n = 2 в примере ниже). Например, учитывая этот фрейм данных:

 df = spark.createDataFrame(
[("2010-03-10", "A", "X"),
 ("2010-03-10", "A", "Y"),
 ("2010-03-10", "B", "Z"),
 ("2010-04-10", "A", "X"),
 ("2010-04-10", "A", "Y"),
 ("2010-04-10", "B", "Z"),
 ("2010-05-10", "A", "X"),
 ("2010-05-10", "A", "Y"),
 ("2010-05-10", "B", "Z"),
 ("2010-06-10", "A", "X"),
 ("2010-06-10", "B", "Z"),
 ("2010-07-10", "A", "X"),
 ("2010-07-10", "B", "Z")],
 ("date", "id1", "id2")
)
df.show()

 ---------- --- --- 
|      date|id1|id2|
 ---------- --- --- 
|2010-03-10|  A|  X|
|2010-03-10|  A|  Y|
|2010-03-10|  B|  Z|
|2010-04-10|  A|  X|
|2010-04-10|  A|  Y|
|2010-04-10|  B|  Z|
|2010-05-10|  A|  X|
|2010-05-10|  A|  Y|
|2010-05-10|  B|  Z|
|2010-06-10|  A|  X|
|2010-06-10|  B|  Z|
|2010-07-10|  A|  X|
|2010-07-10|  B|  Z|
 ---------- --- --- 
 

С ожидаемым результатом:

  ---------- --- --- -------- 
|      date|id1|id2|dup_flag|
 ---------- --- --- -------- 
|2010-03-10|  A|  X|       1|
|2010-03-10|  A|  Y|       1|
|2010-03-10|  B|  Z|       0|
|2010-04-10|  A|  X|       1|
|2010-04-10|  A|  Y|       1|
|2010-04-10|  B|  Z|       0|
|2010-05-10|  A|  X|       1|
|2010-05-10|  A|  Y|       1|
|2010-05-10|  B|  Z|       0|
|2010-06-10|  A|  X|       1|
|2010-06-10|  B|  Z|       0|
|2010-07-10|  A|  X|       0|
|2010-07-10|  B|  Z|       0|
 ---------- --- --- -------- 
 

Где dup_flag — флаг, указывающий, содержит ли id1 дубликат сопоставления id2 где-то за последние 2 месяца.

То есть, для id1 B , он никогда не сопоставляется ни с каким другим id2 , чем Z , следовательно, он никогда не помечается как дубликат. Потому id1 A что у нас есть повторяющиеся сопоставления. В этом примере я установил размер окна 2 равным , что означает, что если A сопоставляется с несколькими id2 в пределах любой даты окна, которая идет не более одного месяца назад (т. Е. В окне должен быть включен текущий месяц, а также прошлый месяц), он должен получить флаг. Следовательно, A получает этот флаг до тех пор, пока 2010-07-10 , как в этот период, у нас не появится окно, состоящее из обоих 2010-06-10 и 2010-07-10 , в котором A не сопоставляется с несколькими id2 (оно сопоставляется только id2 X в этом окне).

Комментарии:

1. Для строки |2010-07-10| A| X| 0| не должен ли флаг быть 1, потому что в 2020-05-10 есть повторяющиеся записи?

2. Флаг должен быть равен 0, потому что для размера окна 2 я бы хотел посмотреть только на 2010-06-10 дату и 2010-07-10 когда 2010-07-10 . 2010-05-10 в этом случае будет игнорироваться.

3. но вы сказали within any date of a window that goes at most two months back 😉

4. Вы правы, ошибка с моей стороны. Исправлено в тексте сейчас!

Ответ №1:

Несколько хакерский способ решить эту проблему. По сути, вы создаете окно, основанное на диапазоне, а не на строке, и выполняете a count distinct над этим окном. Каким-то образом Spark не поддерживает последнее, поэтому мне нужно было обойти его, используя size(collect_set) , по сути, то же count distinct самое, что и . Также Spark не поддерживает окна на основе диапазона, упорядоченные по датам, поэтому мне нужно было привести столбец к целочисленному типу и использовать магическое число 2678400, которое равно количеству секунд в 31 день.

 import pyspark.sql.functions as F
from pyspark.sql.window import Window

### initialize df as in the question
### df = ...

df = df.withColumn('date', F.col('date').cast('timestamp').cast('bigint'))
flag = (F.size(F.collect_set(F.col('id2')).over(Window.partitionBy('id1').orderBy('date').rangeBetween(-2678400, 0))) > 1).cast('int')
df = df.withColumn('flag', flag)
df = df.withColumn('date', F.col('date').cast('timestamp').cast('date'))
 

Идея заключается в чем-то вроде

 COUNT DISTINCT id2 OVER (PARTITION BY id1 ORDER BY date RANGE BETWEEN INTERVAL '1' MONTH PRECEDING AND CURRENT ROW)
 

Комментарии:

1. Спасибо за ответ. Я вижу, что решение не помечается 2010-06-10 как дублирующая запись для A , хотя в нем есть дублирующее отображение 2010-05-10 . Возможно, это связано с тем, что не каждый месяц имеет ровно 30 дней.

2. Да, тогда, возможно, мне следует использовать 31 вместо

3. Я считаю, что это все равно вызовет проблемы, учитывая, что не в каждом месяце ровно 31 день. Похоже, что эта проблема не так тривиальна, как я предполагал, большое вам спасибо за помощь, несмотря ни на что.

4. Ах, я должен был извлечь месяц и преобразовать в целое число, если вас интересует только это!

5. Но тогда возникнет проблема года, например, «2010-01» против «2009-12»