(SPARK) Каков наилучший способ разделения данных, к которым применяются несколько фильтров?

#apache-spark #pyspark #filtering #data-partitioning #azure-databricks

#apache-spark #pyspark #фильтрация #разделение данных #azure-databricks

Вопрос:

Я работаю в Spark (в Azure databricks) с файлом в 15 миллиардов строк, который выглядит следующим образом :

  --------- --------------- ---------------- ------------- -------- ------ 
|client_id|transaction_key|transaction_date|   product_id|store_id|spend|
 --------- --------------- ---------------- ------------- -------- ------ 
|        1|  7587_20121224|      2012-12-24|     38081275|     787| 4.54|
|        1| 10153_20121224|      2012-12-24|         4011|    1053| 2.97|
|        2|  6823_20121224|      2012-12-24|    561122924|     683| 2.94|
|        3| 11131_20121224|      2012-12-24|     80026282|    1131|  0.4|
|        3|  7587_20121224|      2012-12-24|        92532|     787| 5.49|
  

Эти данные используются для всех моих запросов, которые в основном состоят из groupby (например, product_id), sum и count distinct :

 results = trx.filter(col("transaction_date") > "2018-01-01"
                     amp; 
                     col("product_id").isin(["38081275", "4011"])
             .groupby("product_id")
             .agg(sum("spend").alias("total_spend"),
                  countdistinct("transaction_key").alias("number_trx"))
  

Мне никогда не нужно использовать 100% этих данных, я всегда начинаю с фильтра на :

  • transaction_date (1000 различных значений)
  • product_id (1 000 000 различных значений)
  • store_id (1000 различных значений)

==> Каков наилучший способ разделения этих данных в файле parquet?

Изначально я разделил данные на transaction_date :

 trx.write.format("parquet").mode("overwrite").partitionBy("transaction_date").save("dbfs:/linkToParquetFile")
  

Это создаст разделы примерно одинакового размера.
Однако для большинства запросов потребуется сохранить не менее 60% от transaction_date, тогда как в 1 запросе обычно выбирается всего несколько идентификаторов продукта.
(обычно хранится 70% от store_id)

==> Есть ли способ создать файл parquet с учетом этого?

Кажется, что разделение данных на product_id создало бы слишком много разделов…

Спасибо!

Ответ №1:

например, вы можете использовать несколько столбцов для разделения (это создает вложенные папки), а spark может использовать фильтры разделов

еще одна хорошая идея — собрать больше информации здесь (чтобы избежать дополнительной перетасовки)

Пример с hive

 trx.write.partitionBy("transaction_date", "store_id").bucketBy(1000, "product_id").saveAsTable("tableName")
  

чтобы прочитать это, используйте

 spark.table("tableName")
  

Комментарии:

1. Спасибо @Kamrus, это сработало! По какой-то причине я думал, что разделение несколькими столбцами создаст папки на основе комбинации этих столбцов. Однако он создает вложенные папки в каждой папке, что позволяет мне очень эффективно использовать фильтры разделов.