Dask — как эффективно выполнить нужное количество задач

#python #dask #dask-distributed #dask-dataframe

#python #dask #dask-распределенный #dask-dataframe

Вопрос:

Я пытаюсь замаскировать, а затем применить unique операцию к одному столбцу. Упрощенная версия кода, который я использую, приведена ниже:

 import numpy as np
import pandas as pd
import dask.dataframe as dd

data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)

mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()

results = dask.compute([unique_true, unique_false])
 

Этот быстрый пример работает нормально. Мои реальные данные состоят из ~5000 столбцов, где один столбец используется для фильтрации, а другой — для получения уникальных идентификаторов. Данные хранятся в 200 паркетных разделах, каждый из этих разделов весит 9 МБ, но при загрузке в memory ( ddf.get_partition(0).compute().info() ) весит ~5GB . Учитывая, что у меня есть около 400GB ОЗУ, я бы предположил, что я могу загружать 80 разделы (возможно, меньше, учитывая накладные расходы на другие операции). На панели инструментов я вижу, что dask пытается выполнить все задачи одновременно (задачи в памяти всегда одинаковы, не имеет значения, сколько рабочих).

Я написал это, чтобы проверить время, необходимое для обработки раздела:

 start = time.time()

df = ddf.get_partition(0).compute()

mask = df['data'] > 0

unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()

print(time.time() - start)
 

Это занимает около 60s и требует около 7GB ОЗУ. Если я запускаю ProcessPool и предполагаю, что одновременно я запускаю только 50 разделы, это займет 4-5 несколько минут.

Я знаю, что ядро Dask в точности выполняет то, что я делал с одним разделом, поэтому мой вопрос заключается в том, почему Dask пытается выполнять все задачи параллельно, а не по одной за раз? Есть ли способ ограничить выполнение задачи? И это настоящая проблема здесь или я что-то упускаю?

Я нашел здесь несколько вопросов, чтобы ограничить выполнение задач. Все пункты здесь: https://distributed.dask.org/en/latest/resources.html . Однако я считаю, что мне не следует форсировать это поведение и позволить Dask сделать все возможное. Я должен также упомянуть, что Dask может запускать код при настройке 5 рабочих в одном потоке с 80 ГБ оперативной памяти для каждого (но это занимает намного больше времени, чем при использовании метода пула процессов, о котором я упоминал).

Я работаю на python 3.6.10 и Dask 2.17.2 .