#apache-spark #pyspark #out-of-memory
Вопрос:
У меня проблема с памятью в Spark. Я получаю следующую ошибку:
Application application_1631612466028_1802 failed 2 times due to AM Container for appattempt_1631612466028_1802_000002 exited with exitCode: -104
Failing this attempt.Diagnostics: [2021-09-28 21:33:55.259]Container [pid=22067,containerID=container_e65_1631612466028_1802_02_000001] is running 778240B beyond the 'PHYSICAL' memory limit. Current usage: 1.5 GB of 1.5 GB physical memory used; 3.4 GB of 3.1 GB virtual memory used. Killing container.
Dump of the process-tree for container_e65_1631612466028_1802_02_000001 :
Я запускаю скрипт, используя следующую команду:
spark-submit --master yarn --deploy-mode cluster --num-executors 5 --executor-cores 5 --executor-memory 4620MB script.py
Однако. Я думаю, что проблема не в том, как я запускаю скрипт, а в том, что что-то не так в коде. Для меня это выглядит так, как будто Spark не освобождает память после итерации цикла.
Итак, что касается моего кода. Я использую два алгоритма, KMeans и ALS. Во-первых, я группирую пользователей в кластеры, а затем запускаю алгоритмы ALS для каждого кластера.
Мой код выглядит примерно так:
df_features = spark.read.format('avro').load(PATH_FEATURES, schema=SCHEMA_FEATURES)
df_reviews = spark.read.format('avro').load(PATH_REVIEWS, schema=SCHEMA_REVIEWS)
df_input = spark.read.format("avro").load(PATH_PRODUCTS, schema=SCHEMA_PRODUCTS)
df_input = df_input.where(f.col('date_created') >= initial_date)
df_input = df_input.join(df_features, df_input.product_id == df_features.id)
df_input = df_input.select('user_id', 'product_id', 'value')
df_clusters = spark.read.format("avro").load(PATH_CLUSTERS, schema=SCHEMA_CLUSTERS)
for i in range(0, 10):
df_single_cluster = df_clusters.where(f.col('prediction') == i)
df_user_from_single_cluster = df_input.join(df_single_cluster, ['user_id']).drop('prediction')
# some aggregations here...
w = Window.partitionBy('product_id')
c = 'time'
df_normalized = (df_user_from_single_cluster.withColumn('min', f.min(c).over(w))
.withColumn('max', f.max(c).over(w))
.withColumn('normalized', ((f.col(c) - f.col('min')) / (f.col('max') - f.col('min')))))
df_reviews_selected = df_reviews.join(df_single_cluster, f.col('user_id') == f.col('reviewer_id'))
# some aggregations here...
df_normalized = df_normalized.join(df_reviews, ['product_id', 'user_id'], how='full_outer')
df_normalized = df_normalized.fillna({'rating': 0, 'normalized': 0})
df_scaled = df_normalized.select('product_id', 'user_id', 'final_rating')
df_scaled = df_scaled.withColumnRenamed('final_rating', 'rating')
df_scaled = df_scaled.filter(f.col('rating') > 0)
indexer = StringIndexer(inputCol="user_id", outputCol="user_id_idx")
indexed = indexer.fit(df_scaled).transform(df_scaled)
indexer = StringIndexer(inputCol="product_id", outputCol="product_id_idx")
indexed = indexer.fit(indexed).transform(indexed)
als = ALS(maxIter=5, regParam=0.01, userCol="user_id_idx", itemCol="product_id_idx", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(indexed)
userRecs = model.recommendForAllUsers(25)
df = userRecs
df = df.withColumn('recs', f.explode('recommendations'))
df = df.select('user_id_idx', 'recs.*')
meta = [
f.metadata for f in indexed.schema.fields if f.name == "user_id_idx"]
user_labels = meta[0]["ml_attr"]["vals"]
converter = IndexToString(inputCol="user_id_idx", outputCol="user_id", labels=user_labels)
converted = converter.transform(df)
meta = [
f.metadata for f in indexed.schema.fields if f.name == "product_id_idx"]
product_labels = meta[0]["ml_attr"]["vals"]
converter = IndexToString(inputCol="product_id_idx", outputCol="product_id", labels=product_labels)
converted = converter.transform(converted)
df = converted.join(df_scaled, ['user_id', 'product_id'], how='left_anti')
df = df.withColumn('rating', f.bround('rating', 4))
df = df.where(f.col('rating') > 0.0)
# aggregations...
df = df.withColumn('value', f.to_json(f.struct(['user_id', 'product_id', 'rating', 'cluster_id'])))
df = df.withColumnRenamed("user_id", "key")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "xxx")
.option("topic", "xxx")
.save()
spark.stop()
И почему я думаю, что проблема в коде?
Потому что, если я установлю for i in range(0, 10)
, то этот код сможет полностью выполнить 1 итерацию, и он будет работать на 2-й итерации (с ошибкой, упомянутой выше). Однако, если я установлю for i in range(1, 10)
, то этот код сможет полностью выполнить 7 итераций и прервется на 8-й. Кластеры 0 и 8 являются одними из самых больших по количеству пользователей, находящихся в кластерах.
Итак, для меня это выглядит так, как будто кластер № 0 помещает много данных в память, и во время второго взаимодействия вместо освобождения памяти из кластера № 0 он пытается добавить больше данных из кластера № 1 и достигает пределов spark. В то время как во втором случае. Кластеры 1-7 довольно малы, поэтому spark может добавлять данные с каждой итерации в память, не освобождая их до тех пор, пока они не достигнут 8-й итерации, что приводит к достижению пределов spark.
Я нигде не использую df.cache()
, поэтому я не могу просто использовать df.unpersist()
. Я думал, что Сборщик мусора должен заботиться о памяти, но, очевидно, это не так. Если только я не понимаю, как работает архитектура spark или как работает управление памятью.