#python #dataframe #pyspark #apache-spark-sql
#python #фрейм данных #pyspark #apache-spark-sql
Вопрос:
У меня есть два входных фрейма данных, как показано ниже, и я хочу найти повторяющиеся строки, используя метод хэша.
Входной фрейм данных 1: df1
|a |b |c |
|5 |2 |3 |
|1 |5 |4 |
|1 |5 |3 |
Входной фрейм данных 2: df2
|a |b |c |
|5 |2 |3 |
|5 |2 |3 |
|1 |5 |4 |
|1 |5 |3 |
Подготовка col_list и нахождение хэша во входных столбцах
col_list = ['a', 'b', 'c']
df1 = df1.withColumn("hash", md5(concat_ws(' ', *col_list)))
df2 = df2.withColumn("hash", md5(concat_ws(' ', *col_list)))
upd: df1 upd: df2
|a |b |c |hash | |a |b |c |hash |
|5 |2 |3 |sfsd23| |5 |2 |3 |sfsd23|
|1 |5 |4 |fsd345| |5 |2 |3 |sfsd23|
|1 |5 |3 |54sgsr| |1 |5 |4 |fsd345|
|1 |5 |3 |54sgsr|
df_diff = df1.select(df1.hash).substract(df2.select(df2.hash))
df_diff.show() — Ничего
Он не показывает разницу, поскольку хэш соответствует 1 записи хэш-значения из df1, и 2 записи с одинаковым хэш-значением с df2 одинаковы. Но, как найти дубликат в этом после, тогда я хочу один раз вызвать некоторую ошибку, если найду.
Комментарии:
1. вы могли бы попробовать левое антисоединение в столбце has
df_diff = df1.join(df2, ['hash'], 'left_anti')
2. Но вы получите все четыре строки, поэтому вы хотите, чтобы я после этого нашел проверку дубликатов???
3. left_anti — выдает тот же результат, что и subtract.
Ответ №1:
Использование .exceptAll
(из Spark-2.4 ) вместо .substract
as .exceptAll
сохраняет все дублированные строки, используя df2 в качестве исходного фрейма данных.
From docs:
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame.
This is equivalent to EXCEPT DISTINCT in SQL.
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates.
This is equivalent to EXCEPT ALL in SQL.
Example:
From Spark-2.4 :
df2.exceptAll(df1).show(10,False)
# --- --- --- --------------------------------
#|a |b |c |hash |
# --- --- --- --------------------------------
#|5 |2 |3 |747d9c66398e89fbda6570f6bf945ed6|
# --- --- --- --------------------------------
For Spark<2.4:
Нам нужно использовать Spark window row_number functi
для последующего объединения по левому краю, чтобы найти дублированные записи.
Example:
from pyspark.sql.functions import row_number
from pyspark.sql import Window
w=Window.partitionBy(*['a', 'b', 'c', 'hash']).orderBy(lit(1))
df1=df1.withColumn("rn",row_number().over(w))
df2=df2.withColumn("rn",row_number().over(w))
df2.alias("d2").join(df1.alias("d1"),(df1["a"]==df2["a"]) amp; (df1["b"]==df2["b"]) amp; (df1["c"]==df2["c"]) amp; (df1["hash"]==df2["hash"]) amp; (df1["rn"]==df2["rn"]),'left').
filter(col("d1.rn").isNull()).
select("d2.*").
drop("rn").
show()
# --- --- --- --------------------------------
#|a |b |c |hash |
# --- --- --- --------------------------------
#|5 |2 |3 |747d9c66398e89fbda6570f6bf945ed6|
# --- --- --- --------------------------------
Комментарии:
1. Спасибо, все исключения есть в выпуске spark 2.4. Но мы все еще используем версию 2.3.*. Значит, exceptALL не работает? Мы можем сделать это любым другим способом. Ваша идея идеальна, но версия убивает нас.
2. @Rocky1989, пожалуйста, проверьте обновленный ответ для Spark<2.4 ..!