Как найти дубликаты в pyspark dataframe, используя метод hash md5?

#python #dataframe #pyspark #apache-spark-sql

#python #фрейм данных #pyspark #apache-spark-sql

Вопрос:

У меня есть два входных фрейма данных, как показано ниже, и я хочу найти повторяющиеся строки, используя метод хэша.

Входной фрейм данных 1: df1

 |a |b |c |
|5 |2 |3 |
|1 |5 |4 |
|1 |5 |3 |
  

Входной фрейм данных 2: df2

 |a |b |c |
|5 |2 |3 |
|5 |2 |3 |
|1 |5 |4 |
|1 |5 |3 |
  

Подготовка col_list и нахождение хэша во входных столбцах

 col_list = ['a', 'b', 'c']
df1 = df1.withColumn("hash", md5(concat_ws(' ', *col_list)))
df2 = df2.withColumn("hash", md5(concat_ws(' ', *col_list)))

upd: df1               upd: df2
|a |b |c |hash  |      |a |b |c |hash  |
|5 |2 |3 |sfsd23|      |5 |2 |3 |sfsd23|    
|1 |5 |4 |fsd345|      |5 |2 |3 |sfsd23|
|1 |5 |3 |54sgsr|      |1 |5 |4 |fsd345|
                       |1 |5 |3 |54sgsr|

df_diff = df1.select(df1.hash).substract(df2.select(df2.hash))
  

df_diff.show() — Ничего


Он не показывает разницу, поскольку хэш соответствует 1 записи хэш-значения из df1, и 2 записи с одинаковым хэш-значением с df2 одинаковы. Но, как найти дубликат в этом после, тогда я хочу один раз вызвать некоторую ошибку, если найду.

Комментарии:

1. вы могли бы попробовать левое антисоединение в столбце has df_diff = df1.join(df2, ['hash'], 'left_anti')

2. Но вы получите все четыре строки, поэтому вы хотите, чтобы я после этого нашел проверку дубликатов???

3. left_anti — выдает тот же результат, что и subtract.

Ответ №1:

Использование .exceptAll (из Spark-2.4 ) вместо .substract as .exceptAll сохраняет все дублированные строки, используя df2 в качестве исходного фрейма данных.

From docs:

вычесть:

 Return a new DataFrame containing rows in this DataFrame but not in another DataFrame.

This is equivalent to EXCEPT DISTINCT in SQL.
  

За исключением:

 Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates.

This is equivalent to EXCEPT ALL in SQL.
  

Example:

From Spark-2.4 :

 df2.exceptAll(df1).show(10,False)
# --- --- --- -------------------------------- 
#|a  |b  |c  |hash                            |
# --- --- --- -------------------------------- 
#|5  |2  |3  |747d9c66398e89fbda6570f6bf945ed6|
# --- --- --- -------------------------------- 
  

For Spark<2.4:

Нам нужно использовать Spark window row_number functi для последующего объединения по левому краю, чтобы найти дублированные записи.

Example:

 from pyspark.sql.functions import row_number
from pyspark.sql import Window

w=Window.partitionBy(*['a', 'b', 'c', 'hash']).orderBy(lit(1))


df1=df1.withColumn("rn",row_number().over(w))
df2=df2.withColumn("rn",row_number().over(w))

df2.alias("d2").join(df1.alias("d1"),(df1["a"]==df2["a"]) amp; (df1["b"]==df2["b"]) amp; (df1["c"]==df2["c"]) amp; (df1["hash"]==df2["hash"]) amp; (df1["rn"]==df2["rn"]),'left').
filter(col("d1.rn").isNull()).
select("d2.*").
drop("rn").
show()

# --- --- --- -------------------------------- 
#|a  |b  |c  |hash                            |
# --- --- --- -------------------------------- 
#|5  |2  |3  |747d9c66398e89fbda6570f6bf945ed6|
# --- --- --- -------------------------------- 
  

Комментарии:

1. Спасибо, все исключения есть в выпуске spark 2.4. Но мы все еще используем версию 2.3.*. Значит, exceptALL не работает? Мы можем сделать это любым другим способом. Ваша идея идеальна, но версия убивает нас.

2. @Rocky1989, пожалуйста, проверьте обновленный ответ для Spark<2.4 ..!