#apache-spark #apache-spark-sql
#apache-spark #apache-spark-sql
Вопрос:
Я фокусируюсь на группировании и повторном разделении фрейма данных как средства группировки данных вместе, чтобы ускорить последующую обработку.
Цель группирования состоит в том, чтобы разделить данные на фиксированное количество «сегментов» по некоторому набору столбцов. Цель состоит в разделении секционированных данных на равный набор сегментов.
Как я уверен, вы поняли, общая цель состоит в том, чтобы использовать этот метод для предотвращения перекоса данных.
Ниже приведена пошаговая процедура, которую я принял для реализации группирования.
Пожалуйста, обратите внимание, я использую Spark SQL в отличие от PySpark, но принципы те же. Например, я использую функцию NTILE для сбора данных. Эта функция взята из типичного T-SQL.
В любом случае, поехали:
У меня есть следующие два фрейма данных
df_table1 = spark.read.csv("/tamingskew/table1/", header=True)
df_table2 = spark.read.csv("/tamingskew/table2/", header=True)
Apache Spark принимает фрейм данных и создает отдельный раздел для каждого, как показано в следующем коде:
df_table1.rdd.getNumPartitions()
1
Затем я создаю две таблицы для выполнения запросов, чтобы сгенерировать сегментированные данные:
df_table1.createOrReplaceTempView("t1")
df_table2.createOrReplaceTempView("t2")
Теперь мы собираемся выполнить группирование и перераспределение в первой таблице ‘t1’
df_pt1 = spark.sql("""SELECT
t1.*
,NTILE(8) OVER (ORDER BY t1.registration) AS newpart1
FROM t1
ORDER BY newpart1 DESC
""").repartition(8, col("newpart1"))
Как вы можете видеть из приведенного выше, я разделил данные на 8 сегментов и перераспределил данные в 8 разделах на основе группированных данных ‘newpart1’
Затем я выполняю следующий код, чтобы просмотреть, как выглядят разделенные данные
print("Number of partitions: {}".format(df_pt1.rdd.getNumPartitions()))
print('Partitioning distribution: ' str(df_pt1.rdd.glom().map(len).collect()))
Это выглядит следующим образом:
Number of partitions: 8
Partitioning distribution: [0, 1250, 1250, 3750, 0, 0, 2500, 1250]
Приведенный выше вывод не идеален, потому что, как вы можете видеть, в 3 разделах вообще нет никаких данных.
Затем я создаю таблицу из приведенного выше фрейма данных для использования в моем последнем запросе:
df_pt1.createOrReplaceTempView('t11')
Затем я делаю то же самое для table2 (не буду снова вдаваться во все подробности)
df_pt2 = spark.sql("""SELECT
t2.*
,NTILE(8) OVER (ORDER BY t2.sale_price) AS newpart2
FROM t2
ORDER BY newpart2 DESC
""").repartition(8, col("newpart2"))
print("Number of partitions: {}".format(df_pt2.rdd.getNumPartitions()))
print('Partitioning distribution: ' str(df_pt2.rdd.glom().map(len).collect()))
Number of partitions: 8
Partitioning distribution: [0, 12500, 12500, 37500, 0, 0, 25000, 12500]
df_pt2.createOrReplaceTempView('t22')
И окончательный запрос выглядит следующим образом:
querywithgroups = spark.sql("""SELECT
t11.registration
,AVG(t22.sale_price) AS average_price
FROM t11
INNER JOIN t22
ON t11.make = t22.make
AND t11.model = t22.model
WHERE ABS(t22.engine_size - t11.engine_size) <= 0.1
GROUP BY t11.registration
,t11.newpart1
,t22.newpart2
""")
Это обеспечивает следующие разделенные разделы:
Number of partitions: 8
Partitioning distribution: [0, 10000, 10000, 30000, 0, 0, 20000, 10000]
Мой вопрос заключается в следующем: есть ли лучший способ использовать группирование для разделения данных, чтобы ускорить запросы?
Можно ли улучшить вышеуказанное.
Любые мысли с благодарностью.
Комментарии:
1. Как я понимаю, ваш основной процесс начинается с
JOIN
включенияmake
иmodel
, поэтому Spark (по моему признанию, ограниченному знанию) перетасует ваши данные, используя эти столбцы для начала. Который побеждает любое разделение, которое было сделано ранее — это будет просто пустой тратой. Я что-то упускаю?2. @GPI, спасибо за обращение. Я, честно говоря, не знаю ответа. Я надеюсь, что кто-нибудь из SE предоставит несколько интересных предложений