#pyspark
#pyspark
Вопрос:
Я хочу найти максимальное значение в списке. Как вы делаете это в pyspark?
df = spark.createDataFrame([(1, [4,1]), (2, [4,5]), (3, [4,0])], ["A", "B"])
df.show()
--- ------
| A| B|
--- ------
| 1|[4, 1]|
| 2|[4, 5]|
| 3|[4, 0]|
--- ------
В этом следующем примере, как мне найти в каждой строке максимальное значение из списка в столбце B. Итак, для:
- строка 1 —> 4
- строка 2 —> 5
- строка 3 —> 4
Комментарии:
1. как насчет использования
array_max
:df.select("A",array_max("B").alias("B")).show()
?
Ответ №1:
Вы можете использовать aggregate
функцию.
df = spark.createDataFrame([(1, [4,1, 4, 54,4, 2,2, 7,14, 23,74,53]), (2, [4,5, 11, 3,45, 34, 2,3, 4]), (3, [4,0, 32, 23,23, 5,23,2 ,37,8, 6,54, 54])], ["A", "B"])
from pyspark.sql.functions import *
df.withColumn('Max', expr('aggregate(B, 0L, (a, b) -> if(a < b, b, a))')).show(3, False)
--- ---------------------------------------------- ---
|A |B |Max|
--- ---------------------------------------------- ---
|1 |[4, 1, 4, 54, 4, 2, 2, 7, 14, 23, 74, 53] |74 |
|2 |[4, 5, 11, 3, 45, 34, 2, 3, 4] |45 |
|3 |[4, 0, 32, 23, 23, 5, 23, 2, 37, 8, 6, 54, 54]|54 |
--- ---------------------------------------------- ---
Имейте в виду, что 0L
это длинный тип, и вы должны сопоставить тип с элементом массива.
Комментарии:
1. Может ли это быть более общим? Дело в том, что список может различаться по длине и может быть длинным.
2. оно уже является общим, a и b не равны 4 и 1, но a является максимальным до тех пор, пока предыдущее, а b — текущее значение и итерация.
3. Я получаю сообщение об ошибке. Исключение ParseException: u» nextraneous input ‘>’ ожидание {‘(‘, ‘ВЫБРАТЬ’, ‘ИЗ’, ‘ДОБАВИТЬ’, ‘КАК’, ‘ВСЕ’, ‘РАЗНЫЕ’, ‘ГДЕ’, ‘ГРУППА’, ‘ПО’, ‘ГРУППИРОВКА’, ‘НАБОРЫ’, ‘КУБ’, ‘СВЕРТКА’, ‘ПОРЯДОК’, ‘НАЛИЧИЕ’, ‘ОГРАНИЧЕНИЕ’, ‘НА’, ‘ИЛИ’, … }(строка 1, поз. 25)n n== SQL ==naggregate(B, 0L, (a, b) -> if(a < b, b, a))n————————-^^^ n»
4. Какая у вас версия Spark?
5. нет версии Spark 2.7. Вы должны проверить это, функция будет работать как минимум 2.4.0.
Ответ №2:
кажется, это работает. Хотя я точно не знаю, что он делает 🙂
def my_max(s):
return max(s)
from pyspark.sql.types import DateType
my_max2 = F.udf(my_max, DateType())
df.withColumn("mymax", my_max2("B"))