Как найти многорежимный столбец массива в Pyspark

#python #apache-spark #pyspark #apache-spark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я хочу найти режим столбца задачи в этом фрейме данных:

  ----- ----------------------------------------- 
|  id |              task                       |
 ----- ----------------------------------------- 
| 101 |   [person1, person1, person3]           |
| 102 |   [person1, person2, person3]           |
| 103 |           null                          |
| 104 |   [person1, person2]                    |
| 105 |   [person1, person1, person2, person2]  |
| 106 |           null                          |
 ----- ----------------------------------------- 
  

Если существует несколько режимов, я хочу отобразить их все.
Может кто-нибудь, пожалуйста, помочь мне получить этот вывод:

  ----- ----------------------------------------- --------------------------- 
|  id |              task                       |           mode            |
 ----- ----------------------------------------- --------------------------- 
| 101 |   [person1, person1, person3]           |[person1]                  |
| 102 |   [person1, person2, person3]           |[person1, person2, person3]|
| 103 |           null                          |[]                         |
| 104 |   [person1, person2]                    |[person1, person2]         |
| 105 |   [person1, person1, person2, person2]  |[person1, person2]         |
| 106 |           null                          |[]                         |
 ----- ----------------------------------------- --------------------------- 
  

Это мой первый вопрос здесь. Любая помощь или подсказка приветствуется. Спасибо.

Комментарии:

1. при использовании spark > = 2.4.0 вы можете использовать встроенную функцию array_intersect , т.Е.: df.withColumn('intersection', array_intersect(df['task'], df['mode']))

Ответ №1:

Я не вижу причин использовать UDF для этого случая, spark2.4 поскольку мы можем использовать higher order functions для получения желаемого результата. UDF, использующий счетчик, будет очень медленным для больших данных по сравнению с функциями более высокого порядка:

 from pyspark.sql import functions as F

df
  .withColumn("most_common", F.expr("""transform(array_distinct(values),
                                      x-> array(aggregate(values, 0,(acc,t)->acc IF(t=x,1,0)),x))"""))
  .withColumn("most_common", F.expr("""transform(filter(most_common, x-> x[0]==array_max(most_common)[0]),y-> y[1])"""))
  .show(truncate=False)

# --- ---------------------------------------- --------------------------- 
#|id |values                                  |most_common                |
# --- ---------------------------------------- --------------------------- 
#|1  |[good, good, good, bad, bad, good, good]|[good]                     |
#|2  |[bad, badd, good, bad,, good, bad, good]|[bad, good]                |
#|2  |[person1, person2, person3]             |[person1, person2, person3]|
#|2  |null                                    |null                       |
# --- ---------------------------------------- --------------------------- 
  

Ответ №2:

Использование Spark 2.3:

Вы можете решить это с помощью пользовательского UDF . Для получения значений нескольких режимов я использую Counter . Я использую except блок в UDF для нулевых значений в вашем task столбце.
(Для пользователей Python 3.8 есть statistics.multimode() встроенная функция, которую вы можете использовать)

Ваш фрейм данных:

 from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

schema = StructType([StructField("id", IntegerType()), StructField("task", ArrayType(StringType()))])
data = [[101, ["person1", "person1", "person3"]], [102, ["person1", "person2", "person3"]], [103, None], [104, ["person1", "person2"]], [105, ["person1", "person1", "person2", "person2"]], [106, None]]

df = spark.createDataFrame(data,schema=schema)
  

Операция:

 from collections import Counter

def get_multi_mode_list(input_array):
    multi_mode = []
    counter_var = Counter(input_array)  
    try:
        temp = counter_var.most_common(1)[0][1]
    except:
        temp = counter_var.most_common(1)
    for i in counter_var: 
        if input_array.count(i) == temp: 
            multi_mode.append(i)
    return(list(set(multi_mode)))


get_multi_mode_list_udf = F.udf(get_multi_mode_list, ArrayType(StringType()))

df.withColumn("multi_mode", get_multi_mode_list_udf(col("task"))).show(truncate=False)
  

Вывод:

  --- ------------------------------------ --------------------------- 
|id |task                                |multi_mode                 |
 --- ------------------------------------ --------------------------- 
|101|[person1, person1, person3]         |[person1]                  |
|102|[person1, person2, person3]         |[person2, person3, person1]|
|103|null                                |[]                         |
|104|[person1, person2]                  |[person2, person1]         |
|105|[person1, person1, person2, person2]|[person2, person1]         |
|106|null                                |[]                         |
 --- ------------------------------------ --------------------------- 
  

Комментарии:

1. Большое вам спасибо за вашу помощь! Это именно то, что я искал.

Ответ №3:

 from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from collections import Counter
from itertools import groupby
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([
                     StructField('key2', IntegerType(), True),
                 
                     StructField('list6', ArrayType(StringType()), False)
                     ])
df= spark.createDataFrame([
( 101 ,   ["person1", "person1", "person3"] ),
(102 ,   ["person1", "person2", "person3"]   ),
( 103 ,None  ),
( 104  ,  ["person1", "person2"]),
(105 ,   ["person1", "person1", "person2", "person2"])] 
,["id","List"])
def mode(list1):
    res = []
    if(list1 is None or len(list1)==0):
        return []
    test_list1 = Counter(list1)  
    temp = test_list1.most_common(1)[0][1]
    for ele in list1:
       if list1.count(ele) == temp:
           res.append(ele)
    return list(set(res))




df.createOrReplaceTempView("A")
spark.udf.register("mode", mode,ArrayType(StringType()))
spark.sql("select id,list,mode(list)func from A").show(truncate=False)
  

Комментарии:

1. Спасибо за ответ. Но я получаю "IndexError: list index out of range" ошибку, когда применяю ее к своему фрейму данных. Для id = 103 у вас есть пустой список, но у меня есть null. Я думаю, что в этом и заключается ошибка

2. @Whimsy Я отредактировал код, теперь он не будет генерировать исключение