#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
Я хотел бы преобразовать этот фрейм данных PySpark:
df = spark.createDataFrame([
("A", 1),
("A", 2),
("A", 3),
("B", 1),
("B", 2),
("B", 4),
("B", 5)
],
["name", "connect"]
)
df.show()
---- -------
|name|connect|
---- -------
| A| 1|
| A| 2|
| A| 3|
| B| 1|
| B| 2|
| B| 4|
| B| 5|
---- -------
В следующий формат:
df_out = spark.createDataFrame([
("A", "A", 3),
("B", "B", 4),
("A", "B", 2)
],
["name1", "name2", "n_connect"]
)
df_out.show()
----- ----- ---------
|name1|name2|n_connect|
----- ----- ---------
| A| A| 3|
| B| B| 4|
| A| B| 2|
----- ----- ---------
Т.е. я хочу знать, сколько «соединений» имеет каждое имя, и я хочу знать, сколько общих «соединений» существует между каждым именем. Есть ли в Spark какая-либо стандартная функция, которая позволит мне это сделать?
Ответ №1:
Вы можете выполнить самосоединение, объединить идентичные комбинации, то есть A-> B и B-> A, а затем countDistinct connect
для каждой комбинации. ниже мы используем sort_array(array(d1.name, d2.name))
для группировки уникальных комбинаций имен:
from pyspark.sql.functions import countDistinct
df_new = df.alias("d1").join(df.alias("d2"), "connect")
.selectExpr("sort_array(array(d1.name, d2.name)) as names", "d1.connect")
.groupby("names")
.agg(countDistinct("connect").alias("n_connect"))
------ ---------
| names|n_connect|
------ ---------
|[A, A]| 3|
|[B, B]| 4|
|[A, B]| 2|
------ ---------
df_new.selectExpr("names[0] as name1", "names[1] as name2", "n_connect").show()
----- ----- ---------
|name1|name2|n_connect|
----- ----- ---------
| A| A| 3|
| B| B| 4|
| A| B| 2|
----- ----- ---------
вы можете сделать то же самое с pandas:
pdf = df.toPandas()
pdf.merge(pdf, on="connect")
.assign(names=lambda x: [tuple(sorted(z)) for z in zip(x.name_x, x.name_y)])
.groupby('names')["connect"].nunique()
#Out[*]:
#names
#(A, A) 3
#(A, B) 2
#(B, B) 4
Согласно предложению @anky, используйте np.sort() для сортировки имен:
import numpy as np
names = ["name_x", "name_y"]
pdf1 = pdf.merge(pdf, on="connect")
pdf1[names] = np.sort(pdf1[names],1)
pdf1.groupby(names)["connect"].nunique().reset_index()
# name_x name_y connect
#0 A A 3
#1 A B 2
#2 B B 4