Почему мы используем pyspark UDF, когда функции python быстрее, чем они? (Примечание. Не беспокоясь о командах spark SQL)

#python #pyspark #user-defined-functions

#python #pyspark #определяемые пользователем функции

Вопрос:

У меня есть фрейм данных:

 df = (spark
  .range(0, 10 * 1000 * 1000)
  .withColumn('id', (col('id') / 1000).cast('integer'))
  .withColumn('v', rand()))
  

Вывод:

  --- ------------------- 
| id|                  v|
 --- ------------------- 
|  0|0.05011803459635367|
|  0| 0.6749337782428327|
|  0| 0.9449105904567048|
|  0| 0.9183605955607251|
|  0|  0.648596393346793|
 --- ------------------- 
  

Теперь простое добавление 1 к ‘v’ можно выполнить с помощью функций SQL и UDF.

Если мы игнорируем SQL (наилучшую производительность)

Мы можем создать UDF как:

 @udf("double")
def plus_one(v):
    return v   1
  

и вызовите его:

 df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
  

Время: 16,5 сек

Но вот мой вопрос:

если я НЕ использую udf и напрямую пишу:

 def plus_one(v):
        return v   1
df.withColumn('v', plus_one(df.v)).agg(count(col('v'))).show()
  

Затраченное время — 352 мс

В двух словах, запрос UDF занял ~ 16 секунд, тогда как обычная функция python заняла ~ 350 мс

Для сравнения,

 df.selectExpr("id", "v 1 as v").agg(count(col('v'))).show()
  

Время: 347 мс

Вот моя дилемма:

Если я могу выполнить тот же сценарий с обычной функцией python, которая работает по сравнению со встроенными функциями…

В. Почему мы не используем функцию python напрямую?

Вопрос: Имеет ли значение регистрация UDF, только если мы планируем использовать его внутри SQL как команду?

Должна быть какая-то причина оптимизации, почему мы этого не делаем … или, может быть, что-то связанное с тем, как работает spark cluster?

[ Уже есть ответы на 2 вопроса, но оба они заканчиваются словами «Предпочтительны встроенные функции SQL …» Я сравниваю функцию python с UDF, и это возможно в приложении pyspark. ]

Редактировать: я тоже делал это с pandas_udf:

 @pandas_udf('double')
def vectorized_plus_one(v):
    return v   1

df.withColumn('v', vectorized_plus_one(df.v)).agg(count(col('v'))).show()
  

Время: 5.26 секунд

Я приложил скриншот:

Вывод для добавления 1 к значению — Python funtion (standalone), UDF, SQL

Ответ №1:

Ваш сценарий работает, потому что на самом деле вы не добавляете 1 в python, он добавляется в Java способом, очень похожим на тот, который используется, когда вы делаете это с SQL.

Давайте разделим случай отдельно:

  1. Вы делаете plus_one(df.v) , что равносильно просто передаче df.v 1
  2. Попробуйте ввести df.v 1 свой любимый repl, и вы увидите, что он возвращает object типа Column .
  3. Как это может быть? Column класс __radd__ перезаписал магический метод (наряду с некоторыми другими) и возвращает новый Column экземпляр с инструкцией добавить 1 в указанный столбец.

В заключение: withColumn всегда принимает объекты типа Column в качестве второго аргумента, а трюк с добавлением 1 в ваш столбец — это магия python.

Вот почему он работает быстрее, чем udf и vectorized udf : им нужно запускать процесс python, сериализовать / десериализовать данные (векторизованные udf могут работать быстрее, arrow чтобы избежать сериализации / десериализации), вычислять в более медленном процессе python.

Комментарии:

1. Итак, в долгосрочной перспективе функция python, как есть, наверняка потерпит неудачу, если я свяжу какую-то сложную логику? Спасибо, я могу понять, как просто функция python может привести к ошибке «столбец не повторяется».

2. Да, это произойдет, если вы выполняете манипуляции, которые не возвращают новый Column объект. В общем, это могут сделать только простые арифметические операции и функции из pyspark.sql.functions . Для всего остального вам нужно использовать udfs.