Как обрабатывать перекос данных в оконных функциях Spark?

#apache-spark #pyspark #apache-spark-sql #window-functions

#apache-spark #pyspark #apache-spark-sql #окно-функции

Вопрос:

У меня есть набор данных, который я пытаюсь обработать в PySpark. Данные (на диске в виде паркета) содержат идентификаторы пользователей, идентификаторы сеансов и метаданные, относящиеся к каждому сеансу. Я добавляю в свой фрейм данных несколько столбцов, которые являются результатом агрегирования по окну. Проблема, с которой я сталкиваюсь, заключается в том, что все, кроме 4-6 исполнителей, завершатся быстро, а остальные будут выполняться вечно без завершения. Мой код выглядит так:

 import pyspark.sql.functions as f
from pyspark.sql.window import Window

empty_col_a_cond = ((f.col("col_A").isNull()) |
                         (f.col("col_A") == ""))

session_window = Window.partitionBy("user_id", "session_id") 
                       .orderBy(f.col("step_id").asc())

output_df = (
    input_df 
    .withColumn("col_A_val", f
                .when(empty_col_a_cond, f.lit("NA"))
                .otherwise(f.col("col_A"))) 
    # ... 10 more added columns replacing nulls/empty strings
    .repartition("user_id", "session_id")
    .withColumn("s_user_id", f.first("user_id", True).over(session_window)) 
    .withColumn("s_col_B", f.collect_list("col_B").over(session_window)) 
    .withColumn("s_col_C", f.min("col_C").over(session_window)) 
    .withColumn("s_col_D", f.max("col_D").over(session_window)) 
    # ... 16 more added columns aggregating over session_window
    .where(f.col("session_flag") == 1) 
    .where(f.array_contains(f.col("s_col_B"), "some_val"))
)
  

В своих журналах я вижу это снова и снова:

 INFO ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
INFO UnsafeExternalSorter: Thread 92 spilling sort data of 9.2 GB to disk (2  times so far)
INFO UnsafeExternalSorter: Thread 91 spilling sort data of 19.3 GB to disk (0  time so far)
  

Это говорит о том, что Spark не может хранить все оконные данные в памяти. Я попытался увеличить внутренние настройки spark.sql.windowExec.buffer.in.memory.threshold и spark.sql.windowExec.buffer.spill.threshold , что немного помогло, но исполнители все еще не завершены.

Я считаю, что все это вызвано некоторым искажением данных. Группировка по обоим user_id и session_id , есть 5 записей с количеством> = 10000, 100 записей с количеством от 1000 до 10000 и 150 000 записей с количеством менее 1000 (обычно count = 1).

 input_df 
    .groupBy(f.col("user_id"), f.col("session_id")) 
    .count() 
    .filter("count < 1000") 
    .count()

# >= 10k, 6
# < 10k and >= 1k, 108
# < 1k, 150k
  

Это результирующая база данных заданий:

Поиск базы данных Spark job

Комментарии:

1. Вы нашли ответ на этот вопрос?

2. @ShubhamGupta Еще нет. Я пытался солить данные, перераспределять их, но пока безуспешно.