Spark — Проблема с памятью в коде с циклом

#apache-spark #pyspark #out-of-memory

Вопрос:

У меня проблема с памятью в Spark. Я получаю следующую ошибку:

 Application application_1631612466028_1802 failed 2 times due to AM Container for appattempt_1631612466028_1802_000002 exited with exitCode: -104
Failing this attempt.Diagnostics: [2021-09-28 21:33:55.259]Container [pid=22067,containerID=container_e65_1631612466028_1802_02_000001] is running 778240B beyond the 'PHYSICAL' memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.4 GB of 3.1 GB virtual memory used. Killing container.
Dump of the process-tree for container_e65_1631612466028_1802_02_000001 :
 

Я запускаю скрипт, используя следующую команду:

 spark-submit --master yarn --deploy-mode cluster --num-executors 5 --executor-cores 5 --executor-memory 4620MB script.py
 

Однако. Я думаю, что проблема не в том, как я запускаю скрипт, а в том, что что-то не так в коде. Для меня это выглядит так, как будто Spark не освобождает память после итерации цикла.

Итак, что касается моего кода. Я использую два алгоритма, KMeans и ALS. Во-первых, я группирую пользователей в кластеры, а затем запускаю алгоритмы ALS для каждого кластера.

Мой код выглядит примерно так:

     df_features = spark.read.format('avro').load(PATH_FEATURES, schema=SCHEMA_FEATURES)
    df_reviews = spark.read.format('avro').load(PATH_REVIEWS, schema=SCHEMA_REVIEWS)

    df_input = spark.read.format("avro").load(PATH_PRODUCTS, schema=SCHEMA_PRODUCTS)
    df_input = df_input.where(f.col('date_created') >= initial_date)
    df_input = df_input.join(df_features, df_input.product_id == df_features.id)
    df_input = df_input.select('user_id', 'product_id', 'value')

    df_clusters = spark.read.format("avro").load(PATH_CLUSTERS, schema=SCHEMA_CLUSTERS)

    for i in range(0, 10):
        df_single_cluster = df_clusters.where(f.col('prediction') == i)
        df_user_from_single_cluster = df_input.join(df_single_cluster, ['user_id']).drop('prediction')
        
        # some aggregations here...

        w = Window.partitionBy('product_id')
        c = 'time'
        df_normalized = (df_user_from_single_cluster.withColumn('min', f.min(c).over(w))
              .withColumn('max', f.max(c).over(w))
              .withColumn('normalized', ((f.col(c) - f.col('min')) / (f.col('max') - f.col('min')))))

        df_reviews_selected = df_reviews.join(df_single_cluster, f.col('user_id') == f.col('reviewer_id'))

        # some aggregations here...

        df_normalized = df_normalized.join(df_reviews, ['product_id', 'user_id'], how='full_outer')
        df_normalized = df_normalized.fillna({'rating': 0, 'normalized': 0})

        df_scaled = df_normalized.select('product_id', 'user_id', 'final_rating')
        df_scaled = df_scaled.withColumnRenamed('final_rating', 'rating')
        df_scaled = df_scaled.filter(f.col('rating') > 0)
        

        indexer = StringIndexer(inputCol="user_id", outputCol="user_id_idx")
        indexed = indexer.fit(df_scaled).transform(df_scaled)

        indexer = StringIndexer(inputCol="product_id", outputCol="product_id_idx")
        indexed = indexer.fit(indexed).transform(indexed)

        als = ALS(maxIter=5, regParam=0.01, userCol="user_id_idx", itemCol="product_id_idx", ratingCol="rating",
                  coldStartStrategy="drop")


        model = als.fit(indexed)
        userRecs = model.recommendForAllUsers(25)

        df = userRecs
        df = df.withColumn('recs', f.explode('recommendations'))
        df = df.select('user_id_idx', 'recs.*')

        meta = [
            f.metadata for f in indexed.schema.fields if f.name == "user_id_idx"]
        user_labels = meta[0]["ml_attr"]["vals"]

        converter = IndexToString(inputCol="user_id_idx", outputCol="user_id", labels=user_labels)
        converted = converter.transform(df)

        meta = [
            f.metadata for f in indexed.schema.fields if f.name == "product_id_idx"]
        product_labels = meta[0]["ml_attr"]["vals"]

        converter = IndexToString(inputCol="product_id_idx", outputCol="product_id", labels=product_labels)
        converted = converter.transform(converted)


        df = converted.join(df_scaled, ['user_id', 'product_id'], how='left_anti')
        df = df.withColumn('rating', f.bround('rating', 4))
        df = df.where(f.col('rating') > 0.0)
        
        # aggregations...

        df = df.withColumn('value', f.to_json(f.struct(['user_id', 'product_id', 'rating', 'cluster_id'])))
        df = df.withColumnRenamed("user_id", "key")


        df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .write
        .format("kafka")
        .option("kafka.bootstrap.servers", "xxx")
        .option("topic", "xxx")
        .save()

    spark.stop()
 

И почему я думаю, что проблема в коде?
Потому что, если я установлю for i in range(0, 10) , то этот код сможет полностью выполнить 1 итерацию, и он будет работать на 2-й итерации (с ошибкой, упомянутой выше). Однако, если я установлю for i in range(1, 10) , то этот код сможет полностью выполнить 7 итераций и прервется на 8-й. Кластеры 0 и 8 являются одними из самых больших по количеству пользователей, находящихся в кластерах.

Итак, для меня это выглядит так, как будто кластер № 0 помещает много данных в память, и во время второго взаимодействия вместо освобождения памяти из кластера № 0 он пытается добавить больше данных из кластера № 1 и достигает пределов spark. В то время как во втором случае. Кластеры 1-7 довольно малы, поэтому spark может добавлять данные с каждой итерации в память, не освобождая их до тех пор, пока они не достигнут 8-й итерации, что приводит к достижению пределов spark.
Я нигде не использую df.cache() , поэтому я не могу просто использовать df.unpersist() . Я думал, что Сборщик мусора должен заботиться о памяти, но, очевидно, это не так. Если только я не понимаю, как работает архитектура spark или как работает управление памятью.