#palantir-foundry #foundry-code-repositories
#palantir-foundry #foundry-code-репозитории
Вопрос:
Я думаю, что понимаю, как каждое из перераспределения, разделения улья и компоновки влияет на количество выходных файлов, но я не совсем понимаю взаимодействие различных функций. Может кто-нибудь помочь заполнить количество выходных файлов для каждой из приведенных ниже ситуаций, когда я оставил пробел? Цель состоит в том, чтобы понять, какой правильный код подходит для ситуации, когда у меня есть сочетание столбцов с высокой и низкой мощностью, по которым мне нужно разбить / разделить, где у меня есть частые операции, которые фильтруют столбцы с низкой мощностью и объединяют столбцы с высокой мощностью.
Предположим, что у нас есть фрейм данных df
, который начинается с 200 входных разделов, colA
имеет 10 уникальных значений и colB
1000 уникальных значений.
Сначала несколько, чтобы проверить мое понимание:
-
df.repartition(100)
= 100 выходных файлов одинакового размера -
df.repartition('colA')
= 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA -
df.repartition('colB')
= 1000 выходных файлов -
df.repartition(50, 'colA')
= 50 выходных файлов? -
df.repartition(50, 'colB')
= 50 выходных файлов, поэтому некоторые файлы будут содержать более одного значения ColB?
Разделы улья:
-
output.write_dataframe(df, partition_cols=['colA'])
= 1000 выходных файлов (потому что я получаю потенциально 100 файлов в каждом из 10 разделов улья 10) -
output.write_dataframe(df, partition_cols=['colB'])
= 10 000 выходных файлов -
output.write_dataframe(df, partition_cols=['colA', 'colB'])
= 100 000 выходных файлов -
output.write_dataframe(df.repartition('colA'), partition_cols=['colA'])
= 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)
Группирование:
-
output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100)
= 100 выходных файлов? В эксперименте, похоже, это не так -
output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10)
= 10 выходных файлов? -
output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10)
= ???
Теперь все вместе:
-
output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= ??? -
output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= ??? — Это та команда, которую я хочу использовать в конце? И все, что находится ниже по потоку, сначала фильтруется на colA, чтобы воспользоваться разделением улья, а затем присоединяется к ColB, чтобы воспользоваться объединением?
Ответ №1:
Для разбиения на разделы группирование # выходных файлов не является постоянным и будет зависеть от фактических данных входного раздела.Чтобы уточнить, предположим, что df равен 200 разделам, а не 200 файлам. Выходные файлы масштабируются с помощью # входных разделов, а не # файлов. 200 файлов могут вводить в заблуждение, поскольку это может быть от 1 раздела до 1000 разделов.
Сначала несколько, чтобы проверить мое понимание:
df.repartition(100)
= 100 выходных файлов одинакового размера
df.repartition('colA')
= 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA
df.repartition('colB')
= 1000 выходных файлов
df.repartition(50, 'colA')
= 50 выходных файлов
df.repartition(50, 'colB')
= 50 выходных файлов
Разделы улья:
output.write_dataframe(df, partition_cols=['colA'])
= верхняя граница 2000 выходных файлов (200 входных разделов * максимум 10 значений на раздел)
output.write_dataframe(df, partition_cols=['colB'])
= максимум 200 000 выходных файлов (200 * 1000 значений на раздел)
output.write_dataframe(df, partition_cols=['colA', 'colB'])
= максимум 2 000 000 выходных файлов (200 разделов * 10 значений * 1000)
output.write_dataframe(df.repartition('colA'), partition_cols=['colA'])
= 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)
Группирование:
output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100)
= максимум 20 000 файлов (200 разделов * максимум 100 сегментов на раздел)
output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10)
= максимум 2000 файлов (200 разделов * максимум 10 сегментов на раздел)
output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10)
= ровно 10 файлов (перераспределенный набор данных составляет 10 входных разделов, каждый раздел выводит только 1 сегмент)
Теперь все вместе:
output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= Я могу ошибаться в этом, но я считаю, что это максимум 400 000 выходных файлов (200 входных разделов * 10 разделов colA * 200 сегментов ColB)
output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= Я полагаю, что это ровно 10 000 выходных файлов (перераспределение colA, ColB = 10 000 разделов, каждый раздел содержит ровно 1 colA и 1 ведро ColB)
Ответ №2:
Предыстория
Ключом к способности рассуждать о количестве выходных файлов является понимание того, на каком уровне применяется каждая концепция.
Repartition ( df.repartition(N, 'colA', 'colB')
) создает новый этап spark с данными, перетасованными по запросу, в указанное количество перетасованных разделов. Это изменит количество задач на следующем этапе, а также расположение данных в этих задачах.
Разбиение на разделы () и partition_cols=['colA', 'colB']
группирование ( bucket_cols/bucket_count
) имеют какой-либо эффект только в рамках задач заключительного этапа и влияют на то, как задача записывает свои данные в файлы на диске.
В частности, каждая задача заключительного этапа будет записывать по одному файлу на каждую комбинацию раздела улья / корзины, присутствующую в ее данных. Комбинации, отсутствующие в этой задаче, не будут записывать пустой файл, если вы используете разбиение на разделы или компоновку.
Примечание: если не использовать разбиение на разделы или группирование, каждая задача будет записывать ровно один файл, даже если этот файл пуст.
Итак, в общем, вы всегда должны убедиться, что вы перераспределяете свои данные перед записью, чтобы убедиться, что расположение данных соответствует вашим настройкам разделения / компоновки улья (т. Е. Каждая комбинация разделов / блоков не разделяется между несколькими задачами), в противном случае вы можете в конечном итоге записать огромное количество файлов.
Ваши примеры
Я думаю, что существует некоторое недопонимание, поэтому давайте рассмотрим их один за другим.
Сначала несколько, чтобы проверить мое понимание:
df.repartition(100)
= 100 выходных файлов одинакового размера
Да — данные будут случайным образом равномерно перемешаны в 100 разделов, вызывая 100 задач, каждая из которых будет записывать ровно один файл.
df.repartition('colA')
= 10 выходных файлов разного размера, поскольку каждый файл будет содержать все строки для 1 значения colA
Нет — количество разделов для перетасовки не указано, поэтому по умолчанию оно будет равно 200. Таким образом, у вас будет 200 задач, не более 10 из которых будут содержать какие-либо данные (может быть меньше из-за коллизий хэшей), так что в итоге у вас будет 190 пустых файлов и 10 с данными. * Примечание: с AQE в spark 3 spark может решить объединить 200 разделов в меньшее количество, когда поймет, что большинство из них очень маленькие. Я не знаю точной логики, поэтому технически ответ на самом деле «200 или меньше, только 10 будут содержать данные».
df.repartition('colB')
= 1000 выходных файлов Нет — Как и выше, данные будут перетасованы в 200 разделов. Однако в этом случае все они (вероятно) будут содержать данные, поэтому вы получите 200 файлов примерно одинакового размера.
Примечание: из-за коллизий хэшей файлы могут быть больше или меньше в зависимости от того, сколько значений ColB попало в каждую секцию.
df.repartition(50, 'colA')
= 50 выходных файлов?
Да — Аналогично предыдущему, за исключением того, что теперь мы переопределили количество разделов с 200 до 50. Итак, 10 файлов с данными, 40 пустых. (или меньше из-за AQE)
df.repartition(50, 'colB')
= 50 выходных файлов, поэтому некоторые файлы будут содержать более одного значения ColB?
Да — как и раньше, мы получим 50 файлов немного разных размеров в зависимости от того, как работают хэши значений ColB.
Разделы улья:
(Я думаю, что приведенные ниже примеры написаны с учетом df
того, что для начала используется 100 разделов, а не 200, как указано, поэтому я собираюсь пойти с этим)
output.write_dataframe(df, partition_cols=['colA'])
= 1000 выходных файлов (потому что я получаю потенциально 100 файлов в каждом из 10 разделов улья 10)
Да — у вас будет 100 задач, каждая из которых будет записывать один файл для каждого значения colA, которое они видят. Таким образом, до 1000 файлов в случае случайного распределения данных.
output.write_dataframe(df, partition_cols=['colB'])
= 10 000 выходных файлов
Нет — здесь отсутствует 0. 100 задач, каждая из которых может записывать до 1000 файлов (по одному для каждого значения ColB), в общей сложности до 100 000 файлов.
output.write_dataframe(df, partition_cols=['colA', 'colB'])
= 100 000 выходных файлов
Нет — 100 задач, каждая из которых будет записывать один файл для каждой комбинации столбцов разделов, которые она видит. Таких комбинаций 10 000, так что это может записать до 100 * 10 000 = 1 000 000 файлов!
output.write_dataframe(df.repartition('colA'), partition_cols=['colA'])
= 10 выходных файлов разного размера (по 1 файлу в каждом разделе улья)
Да — repartition
наши данные будут перетасованы в 200 задач, но только 10 будут содержать данные. Каждый будет содержать ровно одно значение colA , поэтому будет записан ровно один файл. Остальные 190 задач не будут записывать файлы. Итак, ровно 10 файлов.
Группирование:
Опять же, предполагая 100 разделов для df, а не 200
output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100)
= 100 выходных файлов? В эксперименте, похоже, это не так
Нет — поскольку мы не тщательно распределили данные, у нас есть 100 задач с (возможно) случайно распределенными данными. Каждая задача будет записывать один файл на каждую корзину, которую она видит. Таким образом, это может записать до 100 * 100 = 10 000 файлов!
output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10)
= 10 выходных файлов?
Нет — Аналогично описанному выше, 100 задач, каждая из которых может записывать до 10 файлов. Итак, в наихудшем случае здесь 1000 файлов.
output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10)
= ???
Теперь мы корректируем расположение данных перед записью, у нас будет 200 задач, не более 10 из которых будут содержать какие-либо данные. Каждое значение colA будет существовать только в одной задаче.
Каждая задача будет записывать один файл на каждую корзину, которую она видит. Таким образом, мы должны получить здесь не более 10 файлов.
Примечание: Из-за коллизий хэшей один или несколько сегментов могут быть пустыми, поэтому мы можем получить не ровно 10.
Теперь все вместе:
Опять же, предполагая 100 разделов для df, а не 200
output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= ???
100 задач. 10 разделов улья. 200 сегментов. В худшем случае каждая задача записывает по одному файлу на комбинацию раздела улья / корзины. т.е. 100 * 10 * 200 = 200 000 файлов.
output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
= ??? — Это та команда, которую я хочу использовать в конце? И все, что находится ниже по потоку, сначала фильтруется на colA, чтобы воспользоваться разделением улья, а затем присоединяется к ColB, чтобы воспользоваться объединением?
Этот хитрый. У нас есть 200 задач, и данные тщательно перемешиваются, поэтому каждая комбинация colA / ColB находится только в одной задаче. Итак, все кажется хорошим.
НО каждый сегмент содержит несколько значений ColB , и мы ничего не сделали, чтобы убедиться, что весь сегмент локализован для одной задачи spark.
Таким образом, в худшем случае мы могли бы получить один файл на значение ColB на раздел улья (значение colA). т.е. 10 * 1000 = 10000 файлов.
Учитывая наши конкретные параметры, мы можем добиться немного большего, просто сосредоточившись на оптимальном расположении сегментов:
output.write_dataframe(df.repartition(200, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)
Теперь мы следим за тем colB
, чтобы они были перетасованы точно так, как они будут группироваться, поэтому каждая задача будет содержать ровно один сегмент.
Тогда мы получим один файл для каждого значения colA в задаче (вероятно, 10, поскольку colA перемешивается случайным образом), так что не более 200 * 10 = 2000 файлов.
Это лучшее, что мы можем сделать, предполагая, что colA и ColB не коррелируют.
Заключение
Не существует универсального подхода к управлению размерами файлов.
Как правило, перед записью необходимо убедиться, что вы перетасовали свои данные, чтобы они располагались в соответствии со стратегией разделения / компоновки, которую вы применяете.
Однако особенности того, что делать, могут различаться в каждом конкретном случае в зависимости от ваших точных параметров.
Самое главное — понять, как взаимодействуют эти 3 концепции (как описано в разделе «Фон» выше), чтобы вы могли рассуждать о том, что произойдет с первыми участниками.