Оптимизация Pyspark UDF для больших данных

#apache-spark #pyspark #apache-spark-sql #user-defined-functions

#apache-spark #pyspark #apache-spark-sql #определяемые пользователем функции

Вопрос:

Я пытаюсь оптимизировать этот код, который создает фиктивный код, когда значение столбца (фрейма данных pyspark) находится в [categories] .

Когда запуск выполняется на 100 тыс. строк, для запуска требуется около 30 секунд. В моем случае у меня около 20 миллионов строк, что займет много времени.

 def create_dummy(dframe,col_name,top_name,categories,**options):
    lst_tmp_col = []
    if 'lst_tmp_col' in options:
        lst_tmp_col = options["lst_tmp_col"]
    udf = UserDefinedFunction(lambda x: 1 if x in categories else 0, IntegerType())
    dframe  = dframe.withColumn(str(top_name), udf(col(col_name))).cache()
    dframe = dframe.select(lst_tmp_col  [str(top_name)])
    return dframe 
 

Другими словами, как мне оптимизировать эту функцию и сократить общее время в зависимости от объема моих данных? И как убедиться, что эта функция не выполняет итерацию по моим данным?

Ценю ваши предложения. Спасибо

Ответ №1:

Вам не нужен UDF для кодирования категорий. Вы можете использовать isin :

 import pyspark.sql.functions as F

def create_dummy(dframe, col_name, top_name, categories, **options):
    lst_tmp_col = []
    if 'lst_tmp_col' in options:
        lst_tmp_col = options["lst_tmp_col"]
    dframe = dframe.withColumn(str(top_name), F.col(col_name).isin(categories).cast("int")).cache()
    dframe = dframe.select(lst_tmp_col   [str(top_name)])
    return dframe