Как мне найти наиболее часто встречающийся элемент в списке в pyspark?

#list #pyspark #frequency

Вопрос:

У меня есть фрейм данных pyspark с двумя столбцами, идентификатором и элементами. В столбце «Элементы» есть элемент списка. Это выглядит так,

 ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]
 

Я хочу сформировать столбец с наиболее частым элементом в столбце «Элементы». Вывод должен выглядеть так,

 ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown 
 

Как я могу это сделать с помощью pyspark?

Заранее спасибо.

Ответ №1:

Мы можем использовать функции более высокого порядка здесь (доступно в spark 2.4 ).

  1. Сначала используйте transform и aggregate получите значения для каждого отдельного значения в массиве.
  2. Затем отсортируйте массив структур по убыванию, а затем получите первый элемент.

 from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
              .withColumn("Counts",F.expr("""transform(Dist,x->
                           aggregate(Elements,0,(acc,y)-> IF (y=x, acc 1,acc))
                                      )"""))
              .withColumn("Map",F.arrays_zip("Dist","Counts")
              )).drop("Dist","Counts")
out = temp.withColumn("Output_column",
                    F.expr("""element_at(array_sort(Map,(first,second)->
         CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))
 

Выход:

Обратите внимание, что я добавил пустой массив для идентификатора z для тестирования. Также вы можете удалить столбец Map , добавив .drop("Map") его в вывод

 out.show(truncate=False)

 --- --------------------------------------------- -------------------------------------- --------------- 
|ID |Elements                                     |Map                                   |Output_column  |
 --- --------------------------------------------- -------------------------------------- --------------- 
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
 --- --------------------------------------------- -------------------------------------- --------------- 
 

Для более низких версий вы можете использовать udf с режимом статистики:

 from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())

df.withColumn("Output",u("Elements")).show(truncate=False)
 --- --------------------------------------------- --------------- 
|ID |Elements                                     |Output         |
 --- --------------------------------------------- --------------- 
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
 --- --------------------------------------------- ---------------