#list #pyspark #frequency
Вопрос:
У меня есть фрейм данных pyspark с двумя столбцами, идентификатором и элементами. В столбце «Элементы» есть элемент списка. Это выглядит так,
ID | Elements
_______________________________________
X |[Element5, Element1, Element5]
Y |[Element Unknown, Element Unknown, Element_Z]
Я хочу сформировать столбец с наиболее частым элементом в столбце «Элементы». Вывод должен выглядеть так,
ID | Elements | Output_column
__________________________________________________________________________
X |[Element5, Element1, Element5] | Element5
Y |[Element Unknown, Element Unknown, Element_Z] | Element Unknown
Как я могу это сделать с помощью pyspark?
Заранее спасибо.
Ответ №1:
Мы можем использовать функции более высокого порядка здесь (доступно в spark 2.4 ).
- Сначала используйте
transform
иaggregate
получите значения для каждого отдельного значения в массиве. - Затем отсортируйте массив структур по убыванию, а затем получите первый элемент.
from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
.withColumn("Counts",F.expr("""transform(Dist,x->
aggregate(Elements,0,(acc,y)-> IF (y=x, acc 1,acc))
)"""))
.withColumn("Map",F.arrays_zip("Dist","Counts")
)).drop("Dist","Counts")
out = temp.withColumn("Output_column",
F.expr("""element_at(array_sort(Map,(first,second)->
CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))
Выход:
Обратите внимание, что я добавил пустой массив для идентификатора z для тестирования. Также вы можете удалить столбец Map
, добавив .drop("Map")
его в вывод
out.show(truncate=False)
--- --------------------------------------------- -------------------------------------- ---------------
|ID |Elements |Map |Output_column |
--- --------------------------------------------- -------------------------------------- ---------------
|X |[Element5, Element1, Element5] |[{Element5, 2}, {Element1, 1}] |Element5 |
|Y |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z |[] |[] |null |
--- --------------------------------------------- -------------------------------------- ---------------
Для более низких версий вы можете использовать udf с режимом статистики:
from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())
df.withColumn("Output",u("Elements")).show(truncate=False)
--- --------------------------------------------- ---------------
|ID |Elements |Output |
--- --------------------------------------------- ---------------
|X |[Element5, Element1, Element5] |Element5 |
|Y |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z |[] |null |
--- --------------------------------------------- ---------------