#apache-spark #pyspark #apache-spark-sql #window-functions
#apache-spark #pyspark #apache-spark-sql #окно-функции
Вопрос:
У меня есть набор данных, который я пытаюсь обработать в PySpark. Данные (на диске в виде паркета) содержат идентификаторы пользователей, идентификаторы сеансов и метаданные, относящиеся к каждому сеансу. Я добавляю в свой фрейм данных несколько столбцов, которые являются результатом агрегирования по окну. Проблема, с которой я сталкиваюсь, заключается в том, что все, кроме 4-6 исполнителей, завершатся быстро, а остальные будут выполняться вечно без завершения. Мой код выглядит так:
import pyspark.sql.functions as f
from pyspark.sql.window import Window
empty_col_a_cond = ((f.col("col_A").isNull()) |
(f.col("col_A") == ""))
session_window = Window.partitionBy("user_id", "session_id")
.orderBy(f.col("step_id").asc())
output_df = (
input_df
.withColumn("col_A_val", f
.when(empty_col_a_cond, f.lit("NA"))
.otherwise(f.col("col_A")))
# ... 10 more added columns replacing nulls/empty strings
.repartition("user_id", "session_id")
.withColumn("s_user_id", f.first("user_id", True).over(session_window))
.withColumn("s_col_B", f.collect_list("col_B").over(session_window))
.withColumn("s_col_C", f.min("col_C").over(session_window))
.withColumn("s_col_D", f.max("col_D").over(session_window))
# ... 16 more added columns aggregating over session_window
.where(f.col("session_flag") == 1)
.where(f.array_contains(f.col("s_col_B"), "some_val"))
)
В своих журналах я вижу это снова и снова:
INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to disk (2 times so far)
INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to disk (0 time so far)
Это говорит о том, что Spark не может хранить все оконные данные в памяти. Я попытался увеличить внутренние настройки spark.sql.windowExec.buffer.in.memory.threshold
и spark.sql.windowExec.buffer.spill.threshold
, что немного помогло, но исполнители все еще не завершены.
Я считаю, что все это вызвано некоторым искажением данных. Группировка по обоим user_id
и session_id
, есть 5 записей с количеством> = 10000, 100 записей с количеством от 1000 до 10000 и 150 000 записей с количеством менее 1000 (обычно count = 1).
input_df
.groupBy(f.col("user_id"), f.col("session_id"))
.count()
.filter("count < 1000")
.count()
# >= 10k, 6
# < 10k and >= 1k, 108
# < 1k, 150k
Это результирующая база данных заданий:
Комментарии:
1. Вы нашли ответ на этот вопрос?
2. @ShubhamGupta Еще нет. Я пытался солить данные, перераспределять их, но пока безуспешно.