Почему pyspark/serializers.py не освобождает память?

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

У меня есть папка, в которой есть набор файлов parquet. Мой вариант использования прост

  1. Прочитайте файл parquet
  2. Преобразуйте его в pandas.
  3. Выполните некоторую обработку и покажите результат для каждого файла parquet.

Когда я перебираю несколько паркетов, это работает идеально. Но когда я запускаю его над сотнями файлов, он вылетает из-за ошибок ООМ.

Ниже приведен код, как я читаю parquet как pandas

 def read_parquet_as_pandas(path, spark: SparkSession):
    df = spark.read.parquet(path).toPandas()
    df.sort_values('MY_INDEX', inplace=True)
    df.reset_index(inplace=True, drop=True)
    return df
  

Вышеупомянутая функция вызывается в цикле for, как показано ниже

 spark = SparkSession.builder 
    .master('local[1]') 
    .config("spark.executor.memory", "6g") 
    .config("spark.driver.memory", "10g")
    .getOrCreate()

for i in paths:
    df = read_parquet_as_pandas(i, spark)
  

Память продолжает накапливаться, поэтому я обратился к TraceMalloc, чтобы посмотреть, куда это идет, И все снимки имели следующую строку сверху

 python3.6/site-packages/pyspark/serializers.py:587: size=9139 KiB ( 9139 KiB), count=161624 ( 161624), average=58 B
python3.6/site-packages/pyspark/serializers.py:587: size=12.9 MiB ( 4028 KiB), count=233156 ( 71532), average=58 B
python3.6/site-packages/pyspark/serializers.py:587: size=44.1 MiB ( 31.2 MiB), count=796558 ( 563402), average=58 B
  

Как вы можете видеть, оно продолжает увеличиваться ,

  1. Почему он не освобождает содержимое из предыдущих файлов parquet?
  2. Как освободить от него память?

Комментарии:

1. попробуйте spark.catalog.clearCache() …. после каждой итерации.

2. Нет, это никак не влияет на выходные данные.

3. в вашем цикле for отмените сохранение df для каждой итерации, как только вы закончите обработку этого конкретного файла