#pandas #dask
#pandas #dask
Вопрос:
У меня есть два файла csv. Один основной файл file1
и один file2
со строками, которые необходимо удалить file1
. После удаления этих строк будет выполнен groupby с mainnumber
сохранением суммы и количества.
К сожалению, мои наборы данных кажутся слишком большими для моей 32 ГБ памяти, поскольку процесс остановлен. У меня доступен только этот сервер, других работников нет.
Есть ли какие-либо возможности оптимизировать мой код с точки зрения памяти? Может быть, читая file2.csv
по частям?
import pandas as pd
import dask.dataframe as dd
def custom_cut(partition, bins, labels):
result = pd.cut(x=partition["mainnumber"], bins=bins, labels=labels)
return result
colnames=['mainnumber', 'number', 'index', 'amount']
dfone = dd.read_csv('file1.csv', names=colnames, header=0, dtype={'mainnumber': 'Int64', 'number': 'Int64', 'index': 'Int64', 'amount': 'Int64'})
colnames=['mainnumber', 'number', 'index']
dftwo = dd.read_csv('file2.csv', names=colnames, header=None, dtype={'mainnumber': 'Int64', 'number': 'Int64', 'index': 'Int64'})
dftwo = dftwo[dftwo["index"] < 10000] #Remove some data
dfnew = dd.merge(dfone, dftwo, how='left', indicator='Exist')
del dfone
del dftwo
dfnew = dfnew.loc[dfnew ['Exist'] != 'both']
dfnew = dfnew.drop(columns=['Exist'])
dfnew = (dfnew.groupby('mainnumber')['amount'].agg(['sum', 'count']).reset_index())
#Some example bins:
dfnew = dfnew.groupby(dfnew.map_partitions(custom_cut,
bins=[0,1000,2000],
labels=['first', 'second']))[['sum', 'count']].sum().reset_index()
dfnew = dfnew.compute()
#Write some values to database
Ответ №1:
Для экономии памяти я бы использовал, usecols
и memory_limit
параметры dask.read_csv
. Также подумайте, нужен ли вам int64 или вы могли бы использовать, например, uint32. Другой вариант — использовать категориальные значения, за исключением поля «сумма».
dtype = {'mainnumber': 'category', 'number': 'category', 'index': 'category', 'amount': 'int64'}
dfone = dd.read_csv('file1.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype)
Чтобы сэкономить еще больше памяти, вы можете попробовать выполнить слияние напрямую:
dfnew = dd.merge(
dd.read_csv('file1.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype),
dd.read_csv('file2.csv', names=colnames, usecols=colnames, header=0, blocksize="4GB", dtype=dtype),
how='left', indicator='Exist')
Затем вы можете удалить индексы на месте, чтобы избежать создания копии:
indexes_to_drop = dfnew.index[dfnew['Exist'] != 'both']
dfnew.drop(index=indexes_to_drop, inplace=True)
dfnew.drop(column='Exist', inplace=True)
Комментарии:
1. Спасибо! Какое значение вы посоветуете
memory_limit
, поскольку у меня нет других запущенных задач и доступно 32 ГБ? Я думаюhow='inner'
, что это неправильно? Я пытаюсь удалить все значения из dftwo в dfone и сохранить результат в dfnew.2. @Scripter вы правы, вам нужно выполнить левое соединение. Для memory_limit я бы попробовал максимально возможное, попробуйте «8GB» или «4GB».
3. Спасибо. Я не рассматривал прямое слияние из-за
dftwo = dftwo[dftwo["index"] < 10000]
, посколькуdfone
содержит толькоindex
значения ниже 10000. Я думаю, что это создаст строки,dfnew
если я не удалю их перед слиянием? В чем смысл использования переменнойindexes_to_drop
? Разве это не должно быть dfnew или вообще не должно быть переменной?4.
read_csv
к сожалению, не имеет аргументаmemory_limit
(TypeError: parser_f() got an unexpected keyword argument 'memory_limit'
) .5. Хорошо, извините, правильный параметр
blocksize
. Я исправлю ответ. В любом случае, я бы попытался использовать категории, если они подходят только как фрейм данных pandas, прежде чем использовать dask.