Разделение данных Apache Spark с использованием SQL-функции nTile

#apache-spark #pyspark #databricks

#apache-spark #pyspark #блоки данных

Вопрос:

Я пытаюсь несколькими способами оптимизировать выполнение больших наборов данных с помощью разбиения. В частности, я использую функцию, обычно используемую с традиционными базами данных SQL, называемую nTile.

Цель состоит в том, чтобы поместить определенное количество строк в корзину, используя комбинацию buckettind и перераспределения. Это позволяет Apache Spark обрабатывать данные более эффективно при обработке секционированных наборов данных или, я должен сказать, группированных наборов данных.

Ниже приведены два примера. В первом примере показано, как я использовал ntile для разделения набора данных на два сегмента с последующим разделением данных на 2 раздела в сегментированном nTile с именем skew_data.

Затем я выполняю тот же запрос, но без какой-либо привязки или перераспределения.

Проблема в том, что запрос без разбиения на сегменты выполняется быстрее, чем запрос с разбиением на сегменты, даже запрос без разбиения на сегменты помещает все данные в один раздел, тогда как запрос с разбиением на сегменты разбивает запрос на 2 раздела.

Может кто-нибудь сообщить мне, почему это так.

К вашему сведению, я выполняю запрос в кластере Apache Spark из Databricks. В кластере всего один узел с 2 ядрами и 15 ГБ памяти.

Первый пример с разделением и перераспределением nTile / Bucketting

 allin = spark.sql("""
  SELECT
    t1.make
    , t2.model
    , NTILE(2) OVER (ORDER BY t2.sale_price) AS skew_data
  FROM 
    t1 INNER JOIN t2
    ON t1.engine_size = t2.engine_size2
""")
.repartition(2, col("skew_data"), rand())
.drop('skew_data')
  

Приведенный выше код разбивает данные на разделы следующим образом, с соответствующим распределением разделов

 Number of partitions: 2
Partitioning distribution: [5556767, 5556797]
  

Второй пример: без nTile / разбиения на сегменты или перераспределения

 allin_NO_nTile = spark.sql("""
  SELECT
    t1.make
    ,t2.model
  FROM 
    t1 INNER JOIN t2
    ON t1.engine_size = t2.engine_size2
""")
  

Приведенный выше код помещает все данные в один раздел, как показано ниже:

 Number of partitions: 1
Partitioning distribution: [11113564]
  

Мой вопрос в том, почему второй запрос (без nTile или перераспределения) выполняется быстрее, чем запрос с nTile и перераспределением?

Я приложил немало усилий, чтобы изложить этот вопрос как можно полнее, но если вам нужны дополнительные пояснения, пожалуйста, не стесняйтесь спрашивать. Я действительно хочу разобраться в этом.

Комментарии:

1. Каков физический план для двух запросов? второй запрос потенциально может быть широковещательным хэш-соединением, которое будет быстрее, чем объединение слиянием сортировки или оконная функция, что потребует обмена и сортировки данных.

2. @AndrewLong, спасибо за обращение. Я так и не дошел до сути этого вопроса. Однако мне указали в направлении использования функции bucketBy() вместо этого.

Ответ №1:

Я отказался от своего первоначального подхода и использовал новую функцию PySpark с именем bucketBy(). Если вы хотите знать, как применить bucketBy() к данным корзины, перейдите по https://www.youtube.com/watch?v=dv7IIYuQOXIamp;list=PLOmMQN2IKdjvowfXo_7hnFJHjcE3JOKwuamp;index=39