#python #pyspark #user-defined-functions
#python #pyspark #определяемые пользователем функции
Вопрос:
У меня есть фрейм данных:
df = (spark
.range(0, 10 * 1000 * 1000)
.withColumn('id', (col('id') / 1000).cast('integer'))
.withColumn('v', rand()))
Вывод:
--- -------------------
| id| v|
--- -------------------
| 0|0.05011803459635367|
| 0| 0.6749337782428327|
| 0| 0.9449105904567048|
| 0| 0.9183605955607251|
| 0| 0.648596393346793|
--- -------------------
Теперь простое добавление 1 к ‘v’ можно выполнить с помощью функций SQL и UDF.
Если мы игнорируем SQL (наилучшую производительность)
Мы можем создать UDF как:
@udf("double")
def plus_one(v):
return v 1
и вызовите его:
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Время: 16,5 сек
Но вот мой вопрос:
если я НЕ использую udf и напрямую пишу:
def plus_one(v):
return v 1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
Затраченное время — 352 мс
В двух словах, запрос UDF занял ~ 16 секунд, тогда как обычная функция python заняла ~ 350 мс
Для сравнения,
df.selectExpr("id", "v 1 as v").agg(count(col('v'))).show()
Время: 347 мс
Вот моя дилемма:
Если я могу выполнить тот же сценарий с обычной функцией python, которая работает по сравнению со встроенными функциями…
В. Почему мы не используем функцию python напрямую?
Вопрос: Имеет ли значение регистрация UDF, только если мы планируем использовать его внутри SQL как команду?
Должна быть какая-то причина оптимизации, почему мы этого не делаем … или, может быть, что-то связанное с тем, как работает spark cluster?
[ Уже есть ответы на 2 вопроса, но оба они заканчиваются словами «Предпочтительны встроенные функции SQL …» Я сравниваю функцию python с UDF, и это возможно в приложении pyspark. ]
Редактировать: я тоже делал это с pandas_udf:
@pandas_udf('double')
def vectorized_plus_one(v):
return v 1
df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
Время: 5.26 секунд
Я приложил скриншот:
Вывод для добавления 1 к значению — Python funtion (standalone), UDF, SQL
Ответ №1:
Ваш сценарий работает, потому что на самом деле вы не добавляете 1 в python, он добавляется в Java способом, очень похожим на тот, который используется, когда вы делаете это с SQL.
Давайте разделим случай отдельно:
- Вы делаете
plus_one(df.v)
, что равносильно просто передачеdf.v 1
- Попробуйте ввести
df.v 1
свой любимый repl, и вы увидите, что он возвращает object типаColumn
. - Как это может быть?
Column
класс__radd__
перезаписал магический метод (наряду с некоторыми другими) и возвращает новыйColumn
экземпляр с инструкцией добавить 1 в указанный столбец.
В заключение: withColumn
всегда принимает объекты типа Column
в качестве второго аргумента, а трюк с добавлением 1 в ваш столбец — это магия python.
Вот почему он работает быстрее, чем udf
и vectorized udf
: им нужно запускать процесс python, сериализовать / десериализовать данные (векторизованные udf могут работать быстрее, arrow
чтобы избежать сериализации / десериализации), вычислять в более медленном процессе python.
Комментарии:
1. Итак, в долгосрочной перспективе функция python, как есть, наверняка потерпит неудачу, если я свяжу какую-то сложную логику? Спасибо, я могу понять, как просто функция python может привести к ошибке «столбец не повторяется».
2. Да, это произойдет, если вы выполняете манипуляции, которые не возвращают новый
Column
объект. В общем, это могут сделать только простые арифметические операции и функции изpyspark.sql.functions
. Для всего остального вам нужно использовать udfs.