Dask — Как сэкономить память, например, частично прочитав csv?

#pandas #dask

#pandas #dask

Вопрос:

У меня есть два файла csv. Один основной файл file1 и один file2 со строками, которые необходимо удалить file1 . После удаления этих строк будет выполнен groupby с mainnumber сохранением суммы и количества.

К сожалению, мои наборы данных кажутся слишком большими для моей 32 ГБ памяти, поскольку процесс остановлен. У меня доступен только этот сервер, других работников нет.

Есть ли какие-либо возможности оптимизировать мой код с точки зрения памяти? Может быть, читая file2.csv по частям?

 import pandas as pd
import dask.dataframe as dd

def custom_cut(partition, bins, labels):
    result = pd.cut(x=partition["mainnumber"], bins=bins, labels=labels)
    return result

colnames=['mainnumber', 'number', 'index', 'amount'] 
dfone = dd.read_csv('file1.csv', names=colnames, header=0, dtype={'mainnumber': 'Int64', 'number': 'Int64', 'index': 'Int64', 'amount': 'Int64'})

colnames=['mainnumber', 'number', 'index'] 
dftwo = dd.read_csv('file2.csv', names=colnames, header=None, dtype={'mainnumber': 'Int64', 'number': 'Int64', 'index': 'Int64'})

dftwo = dftwo[dftwo["index"] < 10000] #Remove some data

dfnew  = dd.merge(dfone, dftwo, how='left', indicator='Exist')
del dfone
del dftwo
dfnew  = dfnew.loc[dfnew ['Exist'] != 'both']
dfnew = dfnew.drop(columns=['Exist'])

dfnew = (dfnew.groupby('mainnumber')['amount'].agg(['sum', 'count']).reset_index())

#Some example bins:
dfnew = dfnew.groupby(dfnew.map_partitions(custom_cut,               
    bins=[0,1000,2000], 
    labels=['first', 'second']))[['sum', 'count']].sum().reset_index()

dfnew = dfnew.compute()

#Write some values to database
  

Ответ №1:

Для экономии памяти я бы использовал, usecols и memory_limit параметры dask.read_csv . Также подумайте, нужен ли вам int64 или вы могли бы использовать, например, uint32. Другой вариант — использовать категориальные значения, за исключением поля «сумма».

 dtype = {'mainnumber': 'category', 'number': 'category', 'index': 'category', 'amount': 'int64'}
dfone = dd.read_csv('file1.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype)
  

Чтобы сэкономить еще больше памяти, вы можете попробовать выполнить слияние напрямую:

 dfnew  = dd.merge(
    dd.read_csv('file1.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype),
    dd.read_csv('file2.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype),
    how='left', indicator='Exist')
  

Затем вы можете удалить индексы на месте, чтобы избежать создания копии:

 indexes_to_drop = dfnew.index[dfnew['Exist'] != 'both']
dfnew.drop(index=indexes_to_drop, inplace=True)
dfnew.drop(column='Exist', inplace=True)
  

Комментарии:

1. Спасибо! Какое значение вы посоветуете memory_limit , поскольку у меня нет других запущенных задач и доступно 32 ГБ? Я думаю how='inner' , что это неправильно? Я пытаюсь удалить все значения из dftwo в dfone и сохранить результат в dfnew.

2. @Scripter вы правы, вам нужно выполнить левое соединение. Для memory_limit я бы попробовал максимально возможное, попробуйте «8GB» или «4GB».

3. Спасибо. Я не рассматривал прямое слияние из-за dftwo = dftwo[dftwo["index"] < 10000] , поскольку dfone содержит только index значения ниже 10000. Я думаю, что это создаст строки, dfnew если я не удалю их перед слиянием? В чем смысл использования переменной indexes_to_drop ? Разве это не должно быть dfnew или вообще не должно быть переменной?

4. read_csv к сожалению, не имеет аргумента memory_limit ( TypeError: parser_f() got an unexpected keyword argument 'memory_limit' ) .

5. Хорошо, извините, правильный параметр blocksize . Я исправлю ответ. В любом случае, я бы попытался использовать категории, если они подходят только как фрейм данных pandas, прежде чем использовать dask.