Как постобработать результаты Spark SQL без использования UDF

#apache-spark #pyspark #apache-spark-sql #pyspark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Я прочитал

https://medium.com/teads-engineering/spark-performance-tuning-from-the-trenches-7cbde521cf60

Предлагается не использовать UDF для экономии затрат на десериализацию / сериализацию.

В моем случае я выполнил запрос, подобный этому

 select MYFUN(f1, f2, ...) 
from A ...
  

Я использую MYFUN для постобработки результатов запроса строка за строкой, например, отправки их в другую службу.

 def my_fun(f1, f2, ...):
   service.send(f1, f2, ...)

session.udf.register('MYFUN', my_fun)
  

Без использования UDF я могу захотеть сохранить результаты запроса во фрейме данных Python или в таблице Parque в hdfs, затем считывать с помощью фрейма данных и обрабатывать фреймы данных один за другим.

Проблема в том, что размер таблицы результатов большой, может составлять 1 млн строк. В таком случае все еще имеет смысл удалять UDF?

Какова наилучшая практика для заполнения результата Spark SQL в другой сервис?

Ответ №1:

UDF-файлы Python не рекомендуются с точки зрения производительности, но нет ничего плохого в их использовании при необходимости, как в этом случае: стоимость сериализации / десериализации, вероятно, смешна по сравнению с ожиданиями ввода-вывода, введенными вашим send . Так что, вероятно, не имеет смысла удалять UDF.

В более общем случае есть два способа, с помощью которых вы можете уменьшить объем памяти, занимаемый обработкой фрейма данных. Один из них, о котором вы уже упоминали, — сохранить в файл и обработать файл.

Другой способ — использовать toLocalIterator в вашем фрейме данных. Таким образом, вы будете выполнять итерации по каждому из разделов фрейма данных: вы можете перераспределить фрейм данных, чтобы создать разделы произвольного размера:

 df =df.repartition(100)
for partition in df.toLocalIterator():
    for row in partition:
        send(row)
  

Таким образом, ваши требования к локальной памяти сокращаются до самого большого раздела вашего перераспределенного фрейма данных.