#python #pandas #performance #dataframe #multiprocessing
#python #pandas #Производительность #фрейм данных #многопроцессорная обработка
Вопрос:
Я пытаюсь «мульти» обработать функцию func
, но всегда получаю эту ошибку:
File "c:...programspythonpython37libmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "c:...programspythonpython37libmultiprocessingpool.py", line 657, in get
raise self._value
TypeError: 'type' object is not subscriptable
что я делаю не так? every job
— это словарь, содержащий все необходимые параметры для func
минимальный воспроизводимый образец:
import multiprocessing as mp,pandas as pd
def func(name, raw_df=pd.DataFrame, df={}, width=0):
# 3. do some column operations. (actually theres more than just this operation)
seriesF = raw_df[[name]].dropna()
afterDropping_indices = seriesF.index.copy(deep=True)
list_ = list(raw_df[name])[width:]
df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:])
def preprocess_columns(raw_df ):
# get all inputs.
df, width = {}, 137
args = {"raw_df":raw_df, "df":df, 'width': width }
column_names = raw_df.columns
# get input-dict for every single job.
jobs=[]
for i in range(len(column_names)):
job = {"name":column_names[i]}
job.update(args)
jobs.append(job)
# mutliprocessing
pool = mp.Pool(len(column_names))
pool.map(func, jobs)
# create df from dict and reindex
df=pd.concat(df,axis=1)
df=df.reindex(df.index[::-1])
return df
if __name__=='__main__':
raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000})
raw_df = preprocess_columns(raw_df )
РЕДАКТИРОВАТЬ: версия, в которой передается только столбец вместо raw_df
import multiprocessing as mp,pandas as pd
def func(name, series, df, width):
# 3. do some column operations. (actually theres more than just this operation)
seriesF = series.dropna()
afterDropping_indices = seriesF.index.copy(deep=True)
list_ = list(series)[width:]
df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:])
def preprocess_columns(raw_df ):
df, width = {}, 137
args = {"df":df, 'width': width }
column_names = raw_df.columns
jobs=[]
for i in range(len(column_names)):
job = {"name":column_names[i], "series":raw_df[column_names[i]]}
job.update(args)
jobs.append(job)
pool = mp.Pool(len(column_names))
pool.map(func, jobs)
# create df from dict and reindex
df=pd.concat(df,axis=1)
df=df.reindex(df.index[::-1])
return df
if __name__=='__main__':
raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000})
raw_df = preprocess_columns(raw_df )
это приводит к:
TypeError: func() missing 3 required positional arguments: 'series', 'df', and 'width'
Комментарии:
1.
raw_df=pd.DataFrame
не имеет смысла. Вашим работникам нужен фактический фрейм данных, а неpd.DataFrame
. (На самом деле, им действительно нужен только столбец, с которым они собираются работать, и вам следует изменить свой код, чтобы передавать только этот столбец, чтобы уменьшить затраты на взаимодействие между процессами.)2. @user2357112supportsMonica пожалуйста, извините, я забыл, что я поместил это ключевое слово там, прежде чем отправлять вопрос. к сожалению, ключевые слова не являются причиной ошибки. Ваше предложение о передаче только столбца звучит довольно приятно, но нет ли способа сделать это, передав только имя в качестве элемента, для которого будет происходить распараллеливание?
3. Отредактированный код выдал бы совершенно другую ошибку.
4. @user2357112supportsMonica не были бы вы так добры рассказать мне, что я делаю не так? (отредактировано снова). к предыдущему комментарию:
raw_df
был вargs
словаре
Ответ №1:
Я нашел решение: Обобщил:
- добавлена функция expand_call() (смотрите ниже, что она делает).
- повторение выходного результата и добавление элементов в обычный список.
Внимание: это касается только нескольких потоков.
import multiprocessing as mp,pandas as pd
def func(name, raw_df, df, width):
# 3. do some column operations. (actually theres more than just this operation)
seriesF = raw_df[name].dropna()
afterDropping_indices = seriesF.index.copy(deep=True)
list_ = list(raw_df[name])[width:]
df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:])
df[name].name = name
return df
def expandCall(kargs):
# Expand the arguments of a callback function, kargs[’func’]
func=kargs['func']
del kargs['func']
out=func(**kargs)
return out
def preprocess_columns(raw_df ):
df, width = pd.DataFrame(), 137
args = {"df":df, "raw_df":raw_df, 'width': width }
column_names = raw_df.columns
jobs=[]
for i in range(len(column_names)):
job = {"func":func,"name":column_names[i]}
job.update(args)
jobs.append(job)
pool = mp.Pool(len(column_names))
task=jobs[0]['func'].__name__
outputs= pool.imap_unordered(expandCall, jobs)
out = [];
for i,out_ in enumerate(outputs,1):
out.append(out_)
pool.close(); pool.join() # this is needed to prevent memory leaks return out
# create df from dict and reindex
df=pd.concat(out,axis=1)
df=df.reindex(df.index[::-1])
print(df)
return df
if __name__=='__main__':
raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000})
raw_df = preprocess_columns(raw_df )