Сколько файлов выводится литейным преобразованием в различных комбинациях перераспределения, разделения улья и компоновки?

#palantir-foundry #foundry-code-repositories

#palantir-foundry #foundry-code-репозитории

Вопрос:

Я думаю, что понимаю, как каждое из перераспределения, разделения улья и компоновки влияет на количество выходных файлов, но я не совсем понимаю взаимодействие различных функций. Может кто-нибудь помочь заполнить количество выходных файлов для каждой из приведенных ниже ситуаций, когда я оставил пробел? Цель состоит в том, чтобы понять, какой правильный код подходит для ситуации, когда у меня есть сочетание столбцов с высокой и низкой мощностью, по которым мне нужно разбить / разделить, где у меня есть частые операции, которые фильтруют столбцы с низкой мощностью и объединяют столбцы с высокой мощностью.

Предположим, что у нас есть фрейм данных df , который начинается с 200 входных разделов, colA имеет 10 уникальных значений и colB 1000 уникальных значений.

Сначала несколько, чтобы проверить мое понимание:

  • df.repartition(100) = 100 выходных файлов одинакового размера

  • df.repartition('colA') = 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA

  • df.repartition('colB') = 1000 выходных файлов

  • df.repartition(50, 'colA') = 50 выходных файлов?

  • df.repartition(50, 'colB') = 50 выходных файлов, поэтому некоторые файлы будут содержать более одного значения ColB?

Разделы улья:

  • output.write_dataframe(df, partition_cols=['colA']) = 1000 выходных файлов (потому что я получаю потенциально 100 файлов в каждом из 10 разделов улья 10)

  • output.write_dataframe(df, partition_cols=['colB']) = 10 000 выходных файлов

  • output.write_dataframe(df, partition_cols=['colA', 'colB']) = 100 000 выходных файлов

  • output.write_dataframe(df.repartition('colA'), partition_cols=['colA']) = 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)

Группирование:

  • output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100) = 100 выходных файлов? В эксперименте, похоже, это не так

  • output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10) = 10 выходных файлов?

  • output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10) = ???

Теперь все вместе:

  • output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = ???

  • output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = ??? — Это та команда, которую я хочу использовать в конце? И все, что находится ниже по потоку, сначала фильтруется на colA, чтобы воспользоваться разделением улья, а затем присоединяется к ColB, чтобы воспользоваться объединением?

Ответ №1:

Для разбиения на разделы группирование # выходных файлов не является постоянным и будет зависеть от фактических данных входного раздела.Чтобы уточнить, предположим, что df равен 200 разделам, а не 200 файлам. Выходные файлы масштабируются с помощью # входных разделов, а не # файлов. 200 файлов могут вводить в заблуждение, поскольку это может быть от 1 раздела до 1000 разделов.

Сначала несколько, чтобы проверить мое понимание:

df.repartition(100) = 100 выходных файлов одинакового размера

df.repartition('colA') = 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA

df.repartition('colB') = 1000 выходных файлов

df.repartition(50, 'colA') = 50 выходных файлов

df.repartition(50, 'colB') = 50 выходных файлов

Разделы улья:

output.write_dataframe(df, partition_cols=['colA']) = верхняя граница 2000 выходных файлов (200 входных разделов * максимум 10 значений на раздел)

output.write_dataframe(df, partition_cols=['colB']) = максимум 200 000 выходных файлов (200 * 1000 значений на раздел)

output.write_dataframe(df, partition_cols=['colA', 'colB']) = максимум 2 000 000 выходных файлов (200 разделов * 10 значений * 1000)

output.write_dataframe(df.repartition('colA'), partition_cols=['colA']) = 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)

Группирование:

output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100) = максимум 20 000 файлов (200 разделов * максимум 100 сегментов на раздел)

output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10) = максимум 2000 файлов (200 разделов * максимум 10 сегментов на раздел)

output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10) = ровно 10 файлов (перераспределенный набор данных составляет 10 входных разделов, каждый раздел выводит только 1 сегмент)

Теперь все вместе:

output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = Я могу ошибаться в этом, но я считаю, что это максимум 400 000 выходных файлов (200 входных разделов * 10 разделов colA * 200 сегментов ColB)

output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = Я полагаю, что это ровно 10 000 выходных файлов (перераспределение colA, ColB = 10 000 разделов, каждый раздел содержит ровно 1 colA и 1 ведро ColB)

Ответ №2:

Предыстория

Ключом к способности рассуждать о количестве выходных файлов является понимание того, на каком уровне применяется каждая концепция.

Repartition ( df.repartition(N, 'colA', 'colB') ) создает новый этап spark с данными, перетасованными по запросу, в указанное количество перетасованных разделов. Это изменит количество задач на следующем этапе, а также расположение данных в этих задачах.

Разбиение на разделы () и partition_cols=['colA', 'colB'] группирование ( bucket_cols/bucket_count ) имеют какой-либо эффект только в рамках задач заключительного этапа и влияют на то, как задача записывает свои данные в файлы на диске.

В частности, каждая задача заключительного этапа будет записывать по одному файлу на каждую комбинацию раздела улья / корзины, присутствующую в ее данных. Комбинации, отсутствующие в этой задаче, не будут записывать пустой файл, если вы используете разбиение на разделы или компоновку.

Примечание: если не использовать разбиение на разделы или группирование, каждая задача будет записывать ровно один файл, даже если этот файл пуст.

Итак, в общем, вы всегда должны убедиться, что вы перераспределяете свои данные перед записью, чтобы убедиться, что расположение данных соответствует вашим настройкам разделения / компоновки улья (т. Е. Каждая комбинация разделов / блоков не разделяется между несколькими задачами), в противном случае вы можете в конечном итоге записать огромное количество файлов.

Ваши примеры

Я думаю, что существует некоторое недопонимание, поэтому давайте рассмотрим их один за другим.

Сначала несколько, чтобы проверить мое понимание:

df.repartition(100) = 100 выходных файлов одинакового размера

Да — данные будут случайным образом равномерно перемешаны в 100 разделов, вызывая 100 задач, каждая из которых будет записывать ровно один файл.

df.repartition('colA') = 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA

Нет — количество разделов для перетасовки не указано, поэтому по умолчанию оно будет равно 200. Таким образом, у вас будет 200 задач, не более 10 из которых будут содержать какие-либо данные (может быть меньше из-за коллизий хэшей), так что в итоге у вас будет 190 пустых файлов и 10 с данными. * Примечание: с AQE в spark 3 spark может решить объединить 200 разделов в меньшее количество, когда поймет, что большинство из них очень маленькие. Я не знаю точной логики, поэтому технически ответ на самом деле «200 или меньше, только 10 будут содержать данные».

df.repartition('colB') = 1000 выходных файлов Нет — Как и выше, данные будут перетасованы в 200 разделов. Однако в этом случае все они (вероятно) будут содержать данные, поэтому вы получите 200 файлов примерно одинакового размера.

Примечание: из-за коллизий хэшей файлы могут быть больше или меньше в зависимости от того, сколько значений ColB попало в каждую секцию.

df.repartition(50, 'colA') = 50 выходных файлов?

Да — Аналогично предыдущему, за исключением того, что теперь мы переопределили количество разделов с 200 до 50. Итак, 10 файлов с данными, 40 пустых. (или меньше из-за AQE)

df.repartition(50, 'colB') = 50 выходных файлов, поэтому некоторые файлы будут содержать более одного значения ColB?

Да — как и раньше, мы получим 50 файлов немного разных размеров в зависимости от того, как работают хэши значений ColB.

Разделы улья:

(Я думаю, что приведенные ниже примеры написаны с учетом df того, что для начала используется 100 разделов, а не 200, как указано, поэтому я собираюсь пойти с этим)

output.write_dataframe(df, partition_cols=['colA']) = 1000 выходных файлов (потому что я получаю потенциально 100 файлов в каждом из 10 разделов улья 10)

Да — у вас будет 100 задач, каждая из которых будет записывать один файл для каждого значения colA, которое они видят. Таким образом, до 1000 файлов в случае случайного распределения данных.

output.write_dataframe(df, partition_cols=['colB']) = 10 000 выходных файлов

Нет — здесь отсутствует 0. 100 задач, каждая из которых может записывать до 1000 файлов (по одному для каждого значения ColB), в общей сложности до 100 000 файлов.

output.write_dataframe(df, partition_cols=['colA', 'colB']) = 100 000 выходных файлов

Нет — 100 задач, каждая из которых будет записывать один файл для каждой комбинации столбцов разделов, которые она видит. Таких комбинаций 10 000, так что это может записать до 100 * 10 000 = 1 000 000 файлов!

output.write_dataframe(df.repartition('colA'), partition_cols=['colA']) = 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)

Да repartition наши данные будут перетасованы в 200 задач, но только 10 будут содержать данные. Каждый будет содержать ровно одно значение colA , поэтому будет записан ровно один файл. Остальные 190 задач не будут записывать файлы. Итак, ровно 10 файлов.

Группирование:

Опять же, предполагая 100 разделов для df, а не 200

output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100) = 100 выходных файлов? В эксперименте, похоже, это не так

Нет — поскольку мы не тщательно распределили данные, у нас есть 100 задач с (возможно) случайно распределенными данными. Каждая задача будет записывать один файл на каждую корзину, которую она видит. Таким образом, это может записать до 100 * 100 = 10 000 файлов!

output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10) = 10 выходных файлов?

Нет — Аналогично описанному выше, 100 задач, каждая из которых может записывать до 10 файлов. Итак, в наихудшем случае здесь 1000 файлов.

output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10) = ???

Теперь мы корректируем расположение данных перед записью, у нас будет 200 задач, не более 10 из которых будут содержать какие-либо данные. Каждое значение colA будет существовать только в одной задаче.

Каждая задача будет записывать один файл на каждую корзину, которую она видит. Таким образом, мы должны получить здесь не более 10 файлов.

Примечание: Из-за коллизий хэшей один или несколько сегментов могут быть пустыми, поэтому мы можем получить не ровно 10.

Теперь все вместе:

Опять же, предполагая 100 разделов для df, а не 200

output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = ???

100 задач. 10 разделов улья. 200 сегментов. В худшем случае каждая задача записывает по одному файлу на комбинацию раздела улья / корзины. т.е. 100 * 10 * 200 = 200 000 файлов.

output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200) = ??? — Это та команда, которую я хочу использовать в конце? И все, что находится ниже по потоку, сначала фильтруется на colA, чтобы воспользоваться разделением улья, а затем присоединяется к ColB, чтобы воспользоваться объединением?

Этот хитрый. У нас есть 200 задач, и данные тщательно перемешиваются, поэтому каждая комбинация colA / ColB находится только в одной задаче. Итак, все кажется хорошим.

НО каждый сегмент содержит несколько значений ColB , и мы ничего не сделали, чтобы убедиться, что весь сегмент локализован для одной задачи spark.

Таким образом, в худшем случае мы могли бы получить один файл на значение ColB на раздел улья (значение colA). т.е. 10 * 1000 = 10000 файлов.

Учитывая наши конкретные параметры, мы можем добиться немного большего, просто сосредоточившись на оптимальном расположении сегментов:

 output.write_dataframe(df.repartition(200, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
  

Теперь мы следим за тем colB , чтобы они были перетасованы точно так, как они будут группироваться, поэтому каждая задача будет содержать ровно один сегмент.

Тогда мы получим один файл для каждого значения colA в задаче (вероятно, 10, поскольку colA перемешивается случайным образом), так что не более 200 * 10 = 2000 файлов.

Это лучшее, что мы можем сделать, предполагая, что colA и ColB не коррелируют.

Заключение

Не существует универсального подхода к управлению размерами файлов.

Как правило, перед записью необходимо убедиться, что вы перетасовали свои данные, чтобы они располагались в соответствии со стратегией разделения / компоновки, которую вы применяете.

Однако особенности того, что делать, могут различаться в каждом конкретном случае в зависимости от ваших точных параметров.

Самое главное — понять, как взаимодействуют эти 3 концепции (как описано в разделе «Фон» выше), чтобы вы могли рассуждать о том, что произойдет с первыми участниками.