#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
У меня есть папка, в которой есть набор файлов parquet. Мой вариант использования прост
- Прочитайте файл parquet
- Преобразуйте его в pandas.
- Выполните некоторую обработку и покажите результат для каждого файла parquet.
Когда я перебираю несколько паркетов, это работает идеально. Но когда я запускаю его над сотнями файлов, он вылетает из-за ошибок ООМ.
Ниже приведен код, как я читаю parquet как pandas
def read_parquet_as_pandas(path, spark: SparkSession):
df = spark.read.parquet(path).toPandas()
df.sort_values('MY_INDEX', inplace=True)
df.reset_index(inplace=True, drop=True)
return df
Вышеупомянутая функция вызывается в цикле for, как показано ниже
spark = SparkSession.builder
.master('local[1]')
.config("spark.executor.memory", "6g")
.config("spark.driver.memory", "10g")
.getOrCreate()
for i in paths:
df = read_parquet_as_pandas(i, spark)
Память продолжает накапливаться, поэтому я обратился к TraceMalloc, чтобы посмотреть, куда это идет, И все снимки имели следующую строку сверху
python3.6/site-packages/pyspark/serializers.py:587: size=9139 KiB ( 9139 KiB), count=161624 ( 161624), average=58 B
python3.6/site-packages/pyspark/serializers.py:587: size=12.9 MiB ( 4028 KiB), count=233156 ( 71532), average=58 B
python3.6/site-packages/pyspark/serializers.py:587: size=44.1 MiB ( 31.2 MiB), count=796558 ( 563402), average=58 B
Как вы можете видеть, оно продолжает увеличиваться ,
- Почему он не освобождает содержимое из предыдущих файлов parquet?
- Как освободить от него память?
Комментарии:
1. попробуйте spark.catalog.clearCache() …. после каждой итерации.
2. Нет, это никак не влияет на выходные данные.
3. в вашем цикле for отмените сохранение df для каждой итерации, как только вы закончите обработку этого конкретного файла