поиск общих слов в строках с помощью pyspark / pandas

#pandas #apache-spark #pyspark #apache-spark-sql

#pandas #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть текстовый файл, как показано ниже, с разделителем каналов

 person_id | category | notes
1|A|He bought cat
1|A|He bought dog
1|B|He has hen
2|A|Switzerland Australia
2|A|Australia
 

Я хочу сгруппировать по person_id и категории и найти только те слова, которые повторяются во всей строке

ожидаемый результат

 1|A|He bought
1|B|he has hen
2|A|Australia
 

Я купил количество слов для каждого, используя group по person_id и category, я застрял, получая результат

Я получил количество слов, используя group by, как показано ниже, используя количество слов rdd и spark-sql

 person_id | category | notes
1|A|He (2) bought(2) cat(1) dog(1)
1|B|He(1) has(1) hen(1)
2|A|Switzerland(1) Australia(2)
 

Комментарии:

1. вы ищете решение pandas или pyspark? они очень разные. Также, что вы пробовали до сих пор?

2. Должны ли слова быть в слове, как указано в notes столбце?

3. Что, если бы слова шли в другом порядке? Например, в person_id=1, catgory= A, notes =»Я купил кошку, которая ему нравится». Как насчет чувствительности к регистру?

4. Здравствуйте, мы можем игнорировать чувствительность к регистру и просто получать общие слова во всех строках

5. все приведенные ниже ответы помогли мне, спасибо всем

Ответ №1:

Вы можете добиться этого с помощью функций Spark arrays:

  1. разделите столбец notes , чтобы получить массив слов
  2. сгруппировать по person_id и category собрать список слов
  3. отфильтруйте результирующий массив, проверив, существует ли он во всех собранных подмассивах (т.Е. словах), используя функцию более высокого порядка filter
 import pyspark.sql.functions as F

df1 = df.withColumn("notes", F.split("notes", " ")) 
    .groupBy("person_id", "category") 
    .agg(F.collect_list(F.col("notes")).alias("notes")) 
    .withColumn("w", F.array_distinct(F.flatten("notes"))) 
    .withColumn("notes", F.array_join(F.expr("filter(w, x -> size(filter(notes, y -> array_contains(y, x))) = size(notes))"), " ")) 
    .drop("w")

df1.show()
# --------- -------- ---------- 
#|person_id|category|notes     |
# --------- -------- ---------- 
#|1        |A       |He bought |
#|1        |B       |He has hen|
#|2        |A       |Australia |
# --------- -------- ---------- 
 

Ответ №2:

Другой вариант — определить UDF, который найдет пересечение массива строк:

 from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
data = [
    {"person_id": 1, "category": "A", "notes": "He bought cat"},
    {"person_id": 1, "category": "A", "notes": "He bought dog"},
    {"person_id": 1, "category": "B", "notes": "He has hen"},
    {"person_id": 2, "category": "A", "notes": "Switzerland Australia"},
    {"person_id": 2, "category": "A", "notes": "Australia"},
]


def common(x):
    l = [i.split() for i in x]
    return " ".join(sorted(set.intersection(*map(set, l)), key=l[0].index))

df = spark.createDataFrame(data)
df = df.groupBy(["person_id", "category"]).agg(F.collect_list("notes").alias("b"))
df = df.withColumn("result", F.udf(common)(F.col("b")))
 

Результат:

  --------- -------- ----------                                                  
|person_id|category|result    |
 --------- -------- ---------- 
|1        |A       |He bought |
|1        |B       |He has hen|
|2        |A       |Australia |
 --------- -------- ---------- 
 

Ответ №3:

Вы можете разбить notes строку на слова, а затем разделить слова. Наконец, count количество notes для каждого person_id, category и количество слов для каждого person_id, category . Если их количество равно, постройте слова по collect_list .

 
from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(1, "A", "He bought cat"),
(1, "A", "He bought dog"),
(1, "B", "He has hen"),
(2, "A", "Switzerland Australia"),
(2, "A", "Australia"),]

df = spark.createDataFrame(data, ("person_id", "category", "notes", ))

window_spec_category = Window.partitionBy("person_id", "category")

df_word = df.withColumn("category_count", F.count("*").over(window_spec_category))
            .select("person_id", "category", F.posexplode(F.split(F.col("notes"), " ")).alias("pos", "word"))

window_spec_word = Window.partitionBy("person_id", "category", "word")

matching_words = df_word.withColumn("word_count", F.count("*").over(window_spec_word))
        .withColumn("rn", F.row_number().over(window_spec_word.orderBy(F.lit(None))))
        .filter(F.col("word_count") == F.col("category_count"))
        .filter(F.col("rn") == F.lit(1))
        .drop("rn")

window_spec_collect = window_spec_category.orderBy("pos").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

matching_words.withColumn("result", F.concat_ws(" ", F.collect_list("word").over(window_spec_collect)))
              .withColumn("rn", F.row_number().over(window_spec_category.orderBy(F.lit(None))))
              .filter(F.col("rn") == F.lit(1))
              .select("person_id", "category", "result")
              .show()
 

Вывод

  --------- -------- ---------- 
|person_id|category|    result|
 --------- -------- ---------- 
|        1|       A| He bought|
|        1|       B|He has hen|
|        2|       A| Australia|
 --------- -------- ----------