#apache-spark #pyspark #databricks
#apache-spark #pyspark #блоки данных
Вопрос:
Я пытаюсь несколькими способами оптимизировать выполнение больших наборов данных с помощью разбиения. В частности, я использую функцию, обычно используемую с традиционными базами данных SQL, называемую nTile.
Цель состоит в том, чтобы поместить определенное количество строк в корзину, используя комбинацию buckettind и перераспределения. Это позволяет Apache Spark обрабатывать данные более эффективно при обработке секционированных наборов данных или, я должен сказать, группированных наборов данных.
Ниже приведены два примера. В первом примере показано, как я использовал ntile для разделения набора данных на два сегмента с последующим разделением данных на 2 раздела в сегментированном nTile с именем skew_data.
Затем я выполняю тот же запрос, но без какой-либо привязки или перераспределения.
Проблема в том, что запрос без разбиения на сегменты выполняется быстрее, чем запрос с разбиением на сегменты, даже запрос без разбиения на сегменты помещает все данные в один раздел, тогда как запрос с разбиением на сегменты разбивает запрос на 2 раздела.
Может кто-нибудь сообщить мне, почему это так.
К вашему сведению, я выполняю запрос в кластере Apache Spark из Databricks. В кластере всего один узел с 2 ядрами и 15 ГБ памяти.
Первый пример с разделением и перераспределением nTile / Bucketting
allin = spark.sql("""
SELECT
t1.make
, t2.model
, NTILE(2) OVER (ORDER BY t2.sale_price) AS skew_data
FROM
t1 INNER JOIN t2
ON t1.engine_size = t2.engine_size2
""")
.repartition(2, col("skew_data"), rand())
.drop('skew_data')
Приведенный выше код разбивает данные на разделы следующим образом, с соответствующим распределением разделов
Number of partitions: 2
Partitioning distribution: [5556767, 5556797]
Второй пример: без nTile / разбиения на сегменты или перераспределения
allin_NO_nTile = spark.sql("""
SELECT
t1.make
,t2.model
FROM
t1 INNER JOIN t2
ON t1.engine_size = t2.engine_size2
""")
Приведенный выше код помещает все данные в один раздел, как показано ниже:
Number of partitions: 1
Partitioning distribution: [11113564]
Мой вопрос в том, почему второй запрос (без nTile или перераспределения) выполняется быстрее, чем запрос с nTile и перераспределением?
Я приложил немало усилий, чтобы изложить этот вопрос как можно полнее, но если вам нужны дополнительные пояснения, пожалуйста, не стесняйтесь спрашивать. Я действительно хочу разобраться в этом.
Комментарии:
1. Каков физический план для двух запросов? второй запрос потенциально может быть широковещательным хэш-соединением, которое будет быстрее, чем объединение слиянием сортировки или оконная функция, что потребует обмена и сортировки данных.
2. @AndrewLong, спасибо за обращение. Я так и не дошел до сути этого вопроса. Однако мне указали в направлении использования функции bucketBy() вместо этого.
Ответ №1:
Я отказался от своего первоначального подхода и использовал новую функцию PySpark с именем bucketBy(). Если вы хотите знать, как применить bucketBy() к данным корзины, перейдите по https://www.youtube.com/watch?v=dv7IIYuQOXIamp;list=PLOmMQN2IKdjvowfXo_7hnFJHjcE3JOKwuamp;index=39