Apache Spark Перераспределение / группирование Лучшая практика, чтобы избежать искажения данных

#apache-spark #apache-spark-sql

#apache-spark #apache-spark-sql

Вопрос:

Я фокусируюсь на группировании и повторном разделении фрейма данных как средства группировки данных вместе, чтобы ускорить последующую обработку.

Цель группирования состоит в том, чтобы разделить данные на фиксированное количество «сегментов» по некоторому набору столбцов. Цель состоит в разделении секционированных данных на равный набор сегментов.

Как я уверен, вы поняли, общая цель состоит в том, чтобы использовать этот метод для предотвращения перекоса данных.

Ниже приведена пошаговая процедура, которую я принял для реализации группирования.

Пожалуйста, обратите внимание, я использую Spark SQL в отличие от PySpark, но принципы те же. Например, я использую функцию NTILE для сбора данных. Эта функция взята из типичного T-SQL.

В любом случае, поехали:

У меня есть следующие два фрейма данных

 df_table1 = spark.read.csv("/tamingskew/table1/", header=True)
df_table2 = spark.read.csv("/tamingskew/table2/", header=True)
  

Apache Spark принимает фрейм данных и создает отдельный раздел для каждого, как показано в следующем коде:

 df_table1.rdd.getNumPartitions()
  

1

Затем я создаю две таблицы для выполнения запросов, чтобы сгенерировать сегментированные данные:

 df_table1.createOrReplaceTempView("t1")
df_table2.createOrReplaceTempView("t2")
  

Теперь мы собираемся выполнить группирование и перераспределение в первой таблице ‘t1’

 df_pt1 = spark.sql("""SELECT
  t1.*
 ,NTILE(8) OVER (ORDER BY t1.registration) AS newpart1
FROM t1
ORDER BY newpart1 DESC
""").repartition(8, col("newpart1"))
  

Как вы можете видеть из приведенного выше, я разделил данные на 8 сегментов и перераспределил данные в 8 разделах на основе группированных данных ‘newpart1’

Затем я выполняю следующий код, чтобы просмотреть, как выглядят разделенные данные

 print("Number of partitions: {}".format(df_pt1.rdd.getNumPartitions())) 
print('Partitioning distribution: '  str(df_pt1.rdd.glom().map(len).collect()))
  

Это выглядит следующим образом:

 Number of partitions: 8
Partitioning distribution: [0, 1250, 1250, 3750, 0, 0, 2500, 1250]
  

Приведенный выше вывод не идеален, потому что, как вы можете видеть, в 3 разделах вообще нет никаких данных.

Затем я создаю таблицу из приведенного выше фрейма данных для использования в моем последнем запросе:

 df_pt1.createOrReplaceTempView('t11')
  

Затем я делаю то же самое для table2 (не буду снова вдаваться во все подробности)

 df_pt2 = spark.sql("""SELECT
  t2.*
 ,NTILE(8) OVER (ORDER BY t2.sale_price) AS newpart2
FROM t2
ORDER BY newpart2 DESC
""").repartition(8, col("newpart2"))


print("Number of partitions: {}".format(df_pt2.rdd.getNumPartitions())) 
print('Partitioning distribution: '  str(df_pt2.rdd.glom().map(len).collect()))

Number of partitions: 8
Partitioning distribution: [0, 12500, 12500, 37500, 0, 0, 25000, 12500]

df_pt2.createOrReplaceTempView('t22')
  

И окончательный запрос выглядит следующим образом:

 querywithgroups = spark.sql("""SELECT
  t11.registration
 ,AVG(t22.sale_price) AS average_price
FROM t11
INNER JOIN t22
  ON t11.make = t22.make
    AND t11.model = t22.model
WHERE ABS(t22.engine_size - t11.engine_size) <= 0.1
GROUP BY t11.registration
        ,t11.newpart1
        ,t22.newpart2
""")
  

Это обеспечивает следующие разделенные разделы:

 Number of partitions: 8
Partitioning distribution: [0, 10000, 10000, 30000, 0, 0, 20000, 10000]
  

Мой вопрос заключается в следующем: есть ли лучший способ использовать группирование для разделения данных, чтобы ускорить запросы?
Можно ли улучшить вышеуказанное.

Любые мысли с благодарностью.

Комментарии:

1. Как я понимаю, ваш основной процесс начинается с JOIN включения make и model , поэтому Spark (по моему признанию, ограниченному знанию) перетасует ваши данные, используя эти столбцы для начала. Который побеждает любое разделение, которое было сделано ранее — это будет просто пустой тратой. Я что-то упускаю?

2. @GPI, спасибо за обращение. Я, честно говоря, не знаю ответа. Я надеюсь, что кто-нибудь из SE предоставит несколько интересных предложений