#apache-spark #pyspark #apache-spark-sql #pyspark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Я прочитал
https://medium.com/teads-engineering/spark-performance-tuning-from-the-trenches-7cbde521cf60
Предлагается не использовать UDF для экономии затрат на десериализацию / сериализацию.
В моем случае я выполнил запрос, подобный этому
select MYFUN(f1, f2, ...)
from A ...
Я использую MYFUN для постобработки результатов запроса строка за строкой, например, отправки их в другую службу.
def my_fun(f1, f2, ...):
service.send(f1, f2, ...)
session.udf.register('MYFUN', my_fun)
Без использования UDF я могу захотеть сохранить результаты запроса во фрейме данных Python или в таблице Parque в hdfs, затем считывать с помощью фрейма данных и обрабатывать фреймы данных один за другим.
Проблема в том, что размер таблицы результатов большой, может составлять 1 млн строк. В таком случае все еще имеет смысл удалять UDF?
Какова наилучшая практика для заполнения результата Spark SQL в другой сервис?
Ответ №1:
UDF-файлы Python не рекомендуются с точки зрения производительности, но нет ничего плохого в их использовании при необходимости, как в этом случае: стоимость сериализации / десериализации, вероятно, смешна по сравнению с ожиданиями ввода-вывода, введенными вашим send
. Так что, вероятно, не имеет смысла удалять UDF.
В более общем случае есть два способа, с помощью которых вы можете уменьшить объем памяти, занимаемый обработкой фрейма данных. Один из них, о котором вы уже упоминали, — сохранить в файл и обработать файл.
Другой способ — использовать toLocalIterator в вашем фрейме данных. Таким образом, вы будете выполнять итерации по каждому из разделов фрейма данных: вы можете перераспределить фрейм данных, чтобы создать разделы произвольного размера:
df =df.repartition(100)
for partition in df.toLocalIterator():
for row in partition:
send(row)
Таким образом, ваши требования к локальной памяти сокращаются до самого большого раздела вашего перераспределенного фрейма данных.