найти максимальное значение в списке во фрейме данных Spark

#pyspark

#pyspark

Вопрос:

Я хочу найти максимальное значение в списке. Как вы делаете это в pyspark?

 df = spark.createDataFrame([(1, [4,1]), (2, [4,5]), (3, [4,0])], ["A", "B"])
df.show()

 --- ------ 
|  A|     B|
 --- ------ 
|  1|[4, 1]|
|  2|[4, 5]|
|  3|[4, 0]|
 --- ------ 
  

В этом следующем примере, как мне найти в каждой строке максимальное значение из списка в столбце B. Итак, для:

  • строка 1 —> 4
  • строка 2 —> 5
  • строка 3 —> 4

Комментарии:

1. как насчет использования array_max : df.select("A",array_max("B").alias("B")).show() ?

Ответ №1:

Вы можете использовать aggregate функцию.

 df = spark.createDataFrame([(1, [4,1, 4, 54,4, 2,2, 7,14, 23,74,53]), (2, [4,5, 11, 3,45, 34, 2,3, 4]), (3, [4,0, 32, 23,23, 5,23,2 ,37,8, 6,54, 54])], ["A", "B"])

from pyspark.sql.functions import *

df.withColumn('Max', expr('aggregate(B, 0L, (a, b) -> if(a < b, b, a))')).show(3, False)

 --- ---------------------------------------------- --- 
|A  |B                                             |Max|
 --- ---------------------------------------------- --- 
|1  |[4, 1, 4, 54, 4, 2, 2, 7, 14, 23, 74, 53]     |74 |
|2  |[4, 5, 11, 3, 45, 34, 2, 3, 4]                |45 |
|3  |[4, 0, 32, 23, 23, 5, 23, 2, 37, 8, 6, 54, 54]|54 |
 --- ---------------------------------------------- --- 
  

Имейте в виду, что 0L это длинный тип, и вы должны сопоставить тип с элементом массива.

Комментарии:

1. Может ли это быть более общим? Дело в том, что список может различаться по длине и может быть длинным.

2. оно уже является общим, a и b не равны 4 и 1, но a является максимальным до тех пор, пока предыдущее, а b — текущее значение и итерация.

3. Я получаю сообщение об ошибке. Исключение ParseException: u» nextraneous input ‘>’ ожидание {‘(‘, ‘ВЫБРАТЬ’, ‘ИЗ’, ‘ДОБАВИТЬ’, ‘КАК’, ‘ВСЕ’, ‘РАЗНЫЕ’, ‘ГДЕ’, ‘ГРУППА’, ‘ПО’, ‘ГРУППИРОВКА’, ‘НАБОРЫ’, ‘КУБ’, ‘СВЕРТКА’, ‘ПОРЯДОК’, ‘НАЛИЧИЕ’, ‘ОГРАНИЧЕНИЕ’, ‘НА’, ‘ИЛИ’, … }(строка 1, поз. 25)n n== SQL ==naggregate(B, 0L, (a, b) -> if(a < b, b, a))n————————-^^^ n»

4. Какая у вас версия Spark?

5. нет версии Spark 2.7. Вы должны проверить это, функция будет работать как минимум 2.4.0.

Ответ №2:

кажется, это работает. Хотя я точно не знаю, что он делает 🙂

 def my_max(s):
    return max(s)

from pyspark.sql.types import DateType
my_max2 = F.udf(my_max, DateType())


df.withColumn("mymax", my_max2("B"))