#python #dask #dask-distributed #dask-dataframe
#python #dask #dask-распределенный #dask-dataframe
Вопрос:
Я пытаюсь замаскировать, а затем применить unique
операцию к одному столбцу. Упрощенная версия кода, который я использую, приведена ниже:
import numpy as np
import pandas as pd
import dask.dataframe as dd
data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)
mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()
results = dask.compute([unique_true, unique_false])
Этот быстрый пример работает нормально. Мои реальные данные состоят из ~5000
столбцов, где один столбец используется для фильтрации, а другой — для получения уникальных идентификаторов. Данные хранятся в 200
паркетных разделах, каждый из этих разделов весит 9 МБ, но при загрузке в memory ( ddf.get_partition(0).compute().info()
) весит ~5GB
. Учитывая, что у меня есть около 400GB
ОЗУ, я бы предположил, что я могу загружать 80
разделы (возможно, меньше, учитывая накладные расходы на другие операции). На панели инструментов я вижу, что dask пытается выполнить все задачи одновременно (задачи в памяти всегда одинаковы, не имеет значения, сколько рабочих).
Я написал это, чтобы проверить время, необходимое для обработки раздела:
start = time.time()
df = ddf.get_partition(0).compute()
mask = df['data'] > 0
unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()
print(time.time() - start)
Это занимает около 60s
и требует около 7GB
ОЗУ. Если я запускаю ProcessPool и предполагаю, что одновременно я запускаю только 50
разделы, это займет 4-5
несколько минут.
Я знаю, что ядро Dask в точности выполняет то, что я делал с одним разделом, поэтому мой вопрос заключается в том, почему Dask пытается выполнять все задачи параллельно, а не по одной за раз? Есть ли способ ограничить выполнение задачи? И это настоящая проблема здесь или я что-то упускаю?
Я нашел здесь несколько вопросов, чтобы ограничить выполнение задач. Все пункты здесь: https://distributed.dask.org/en/latest/resources.html . Однако я считаю, что мне не следует форсировать это поведение и позволить Dask сделать все возможное. Я должен также упомянуть, что Dask может запускать код при настройке 5 рабочих в одном потоке с 80 ГБ оперативной памяти для каждого (но это занимает намного больше времени, чем при использовании метода пула процессов, о котором я упоминал).
Я работаю на python 3.6.10
и Dask 2.17.2
.