#python #apache-spark #pyspark #apache-spark-sql
#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
Я хочу найти режим столбца задачи в этом фрейме данных:
----- -----------------------------------------
| id | task |
----- -----------------------------------------
| 101 | [person1, person1, person3] |
| 102 | [person1, person2, person3] |
| 103 | null |
| 104 | [person1, person2] |
| 105 | [person1, person1, person2, person2] |
| 106 | null |
----- -----------------------------------------
Если существует несколько режимов, я хочу отобразить их все.
Может кто-нибудь, пожалуйста, помочь мне получить этот вывод:
----- ----------------------------------------- ---------------------------
| id | task | mode |
----- ----------------------------------------- ---------------------------
| 101 | [person1, person1, person3] |[person1] |
| 102 | [person1, person2, person3] |[person1, person2, person3]|
| 103 | null |[] |
| 104 | [person1, person2] |[person1, person2] |
| 105 | [person1, person1, person2, person2] |[person1, person2] |
| 106 | null |[] |
----- ----------------------------------------- ---------------------------
Это мой первый вопрос здесь. Любая помощь или подсказка приветствуется. Спасибо.
Комментарии:
1. при использовании spark > = 2.4.0 вы можете использовать встроенную функцию array_intersect , т.Е.:
df.withColumn('intersection', array_intersect(df['task'], df['mode']))
Ответ №1:
Я не вижу причин использовать UDF
для этого случая, spark2.4
поскольку мы можем использовать higher order functions
для получения желаемого результата. UDF, использующий счетчик, будет очень медленным для больших данных по сравнению с функциями более высокого порядка:
from pyspark.sql import functions as F
df
.withColumn("most_common", F.expr("""transform(array_distinct(values),
x-> array(aggregate(values, 0,(acc,t)->acc IF(t=x,1,0)),x))"""))
.withColumn("most_common", F.expr("""transform(filter(most_common, x-> x[0]==array_max(most_common)[0]),y-> y[1])"""))
.show(truncate=False)
# --- ---------------------------------------- ---------------------------
#|id |values |most_common |
# --- ---------------------------------------- ---------------------------
#|1 |[good, good, good, bad, bad, good, good]|[good] |
#|2 |[bad, badd, good, bad,, good, bad, good]|[bad, good] |
#|2 |[person1, person2, person3] |[person1, person2, person3]|
#|2 |null |null |
# --- ---------------------------------------- ---------------------------
Ответ №2:
Использование Spark 2.3:
Вы можете решить это с помощью пользовательского UDF
. Для получения значений нескольких режимов я использую Counter
. Я использую except
блок в UDF для нулевых значений в вашем task
столбце.
(Для пользователей Python 3.8 есть statistics.multimode()
встроенная функция, которую вы можете использовать)
Ваш фрейм данных:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *
schema = StructType([StructField("id", IntegerType()), StructField("task", ArrayType(StringType()))])
data = [[101, ["person1", "person1", "person3"]], [102, ["person1", "person2", "person3"]], [103, None], [104, ["person1", "person2"]], [105, ["person1", "person1", "person2", "person2"]], [106, None]]
df = spark.createDataFrame(data,schema=schema)
Операция:
from collections import Counter
def get_multi_mode_list(input_array):
multi_mode = []
counter_var = Counter(input_array)
try:
temp = counter_var.most_common(1)[0][1]
except:
temp = counter_var.most_common(1)
for i in counter_var:
if input_array.count(i) == temp:
multi_mode.append(i)
return(list(set(multi_mode)))
get_multi_mode_list_udf = F.udf(get_multi_mode_list, ArrayType(StringType()))
df.withColumn("multi_mode", get_multi_mode_list_udf(col("task"))).show(truncate=False)
Вывод:
--- ------------------------------------ ---------------------------
|id |task |multi_mode |
--- ------------------------------------ ---------------------------
|101|[person1, person1, person3] |[person1] |
|102|[person1, person2, person3] |[person2, person3, person1]|
|103|null |[] |
|104|[person1, person2] |[person2, person1] |
|105|[person1, person1, person2, person2]|[person2, person1] |
|106|null |[] |
--- ------------------------------------ ---------------------------
Комментарии:
1. Большое вам спасибо за вашу помощь! Это именно то, что я искал.
Ответ №3:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from collections import Counter
from itertools import groupby
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([
StructField('key2', IntegerType(), True),
StructField('list6', ArrayType(StringType()), False)
])
df= spark.createDataFrame([
( 101 , ["person1", "person1", "person3"] ),
(102 , ["person1", "person2", "person3"] ),
( 103 ,None ),
( 104 , ["person1", "person2"]),
(105 , ["person1", "person1", "person2", "person2"])]
,["id","List"])
def mode(list1):
res = []
if(list1 is None or len(list1)==0):
return []
test_list1 = Counter(list1)
temp = test_list1.most_common(1)[0][1]
for ele in list1:
if list1.count(ele) == temp:
res.append(ele)
return list(set(res))
df.createOrReplaceTempView("A")
spark.udf.register("mode", mode,ArrayType(StringType()))
spark.sql("select id,list,mode(list)func from A").show(truncate=False)
Комментарии:
1. Спасибо за ответ. Но я получаю
"IndexError: list index out of range"
ошибку, когда применяю ее к своему фрейму данных. Для id = 103 у вас есть пустой список, но у меня есть null. Я думаю, что в этом и заключается ошибка2. @Whimsy Я отредактировал код, теперь он не будет генерировать исключение