#apache-spark #pyspark #apache-spark-sql #window
#apache-spark #pyspark #apache-spark-sql #окно
Вопрос:
Мне нужно удалить строки, в которых для одного и того же идентификатора, p_id и key_id отсутствует обратная связь, но у нас есть некоторые отзывы.
ввод
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p1 k2
1 p2 k3
2 p1 k3 sad
вывод
id p_id key_id feedback
1 p1 k1 happy
1 p1 k1 sad
1 p1 k2 sad
1 p2 k3
2 p1 k3 sad
Как я могу добиться этого в pyspark?
Ответ №1:
Я бы создал новый столбец с именем min_length
и фильтровал по этому столбцу и feedback
столбцу:
import pyspark.sql.functions as F
import pyspark.sql.window.Window as W
df = df.withColumn('min_length',
F.min(F.length(F.trim(F.col('feedback'))))
.over(W.partitionBy('id', 'p_id', 'key_id'))
)
cond = (F.col('min_length') != 0) amp; (F.length(F.trim(F.col('feedback'))) == 0)
df.filter(~cond)
Обрезки просто удаляют все пробелы в столбце feedback
Комментарии:
1. Спасибо, я немного изменил значение, чтобы сделать его равным 0, если значение равно None, используя coalesce, иначе он пропускал эту проверку при выполнении длины
Ответ №2:
Вы можете добавить столбец (назовем его num_feedbacks) для каждого ключа ([id, p_id, key_id] ), который подсчитывает, сколько отзывов для этого ключа у вас есть в DataFrame. Затем вы можете фильтровать свой фрейм данных, сохраняя только те строки, в которых у вас есть обратная связь (обратная связь не равна нулю), или у вас нет обратной связи для этого конкретного ключа.
Вот пример кода:
key = ['id', 'p_id', 'key_id']
num_feedbacks = df.filter(col('feedback')!="")
.groupby(key).agg(F.count('feedback').alias('num_feedbacks'))
df = df.join(num_feedbacks, on=key, how='left')
.filter((col('feedback')!="") | (col('num_feedbacks').isNull()))
.drop('num_feedbacks')
Что дает вам:
--- ---- ------ --------
| id|p_id|key_id|feedback|
--- ---- ------ --------
| 2| p1| k3| sad|
| 1| p1| k1| sad|
| 1| p1| k1| happy|
| 1| p1| k2| sad|
| 1| p2| k3| |
--- ---- ------ --------