«TypeError: объект ‘type’ не подлежит подписке» при выполнении многопроцессорной обработки. Что я делаю не так?

#python #pandas #performance #dataframe #multiprocessing

#python #pandas #Производительность #фрейм данных #многопроцессорная обработка

Вопрос:

Я пытаюсь «мульти» обработать функцию func , но всегда получаю эту ошибку:

 File "c:...programspythonpython37libmultiprocessingpool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()

  File "c:...programspythonpython37libmultiprocessingpool.py", line 657, in get
    raise self._value

TypeError: 'type' object is not subscriptable
  

что я делаю не так? every job — это словарь, содержащий все необходимые параметры для func

минимальный воспроизводимый образец:

 import multiprocessing as mp,pandas as pd
def func(name, raw_df=pd.DataFrame, df={}, width=0):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  raw_df[[name]].dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(raw_df[name])[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:]) 
       
def preprocess_columns(raw_df ):
 
    # get all inputs.
    df, width = {}, 137 
    args = {"raw_df":raw_df, "df":df, 'width': width }  
    column_names = raw_df.columns

    # get input-dict for every single job.
    jobs=[]
    for i in range(len(column_names)):
        job = {"name":column_names[i]}
        job.update(args) 
        jobs.append(job) 

    # mutliprocessing
    pool = mp.Pool(len(column_names))  
    pool.map(func, jobs)    
    
    # create df from dict and reindex 
    df=pd.concat(df,axis=1) 
    df=df.reindex(df.index[::-1])
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000}) 
    raw_df = preprocess_columns(raw_df ) 
  

РЕДАКТИРОВАТЬ: версия, в которой передается только столбец вместо raw_df

 import multiprocessing as mp,pandas as pd
def func(name, series, df, width):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  series.dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(series)[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:]) 
       
def preprocess_columns(raw_df ):
 
    df, width = {}, 137 
    args = {"df":df, 'width': width } 
     
    column_names = raw_df.columns
    jobs=[]
    for i in range(len(column_names)):
        job = {"name":column_names[i], "series":raw_df[column_names[i]]}
        job.update(args)  
        jobs.append(job)
    
    pool = mp.Pool(len(column_names))  
    pool.map(func, jobs)    
    
    # create df from dict and reindex 
    df=pd.concat(df,axis=1) 
    df=df.reindex(df.index[::-1])
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000}) 
    raw_df = preprocess_columns(raw_df ) 
  

это приводит к:

 TypeError: func() missing 3 required positional arguments: 'series', 'df', and 'width'
  

Комментарии:

1. raw_df=pd.DataFrame не имеет смысла. Вашим работникам нужен фактический фрейм данных, а не pd.DataFrame . (На самом деле, им действительно нужен только столбец, с которым они собираются работать, и вам следует изменить свой код, чтобы передавать только этот столбец, чтобы уменьшить затраты на взаимодействие между процессами.)

2. @user2357112supportsMonica пожалуйста, извините, я забыл, что я поместил это ключевое слово там, прежде чем отправлять вопрос. к сожалению, ключевые слова не являются причиной ошибки. Ваше предложение о передаче только столбца звучит довольно приятно, но нет ли способа сделать это, передав только имя в качестве элемента, для которого будет происходить распараллеливание?

3. Отредактированный код выдал бы совершенно другую ошибку.

4. @user2357112supportsMonica не были бы вы так добры рассказать мне, что я делаю не так? (отредактировано снова). к предыдущему комментарию: raw_df был в args словаре

Ответ №1:

Я нашел решение: Обобщил:

  1. добавлена функция expand_call() (смотрите ниже, что она делает).
  2. повторение выходного результата и добавление элементов в обычный список.

Внимание: это касается только нескольких потоков.

 
import multiprocessing as mp,pandas as pd
def func(name, raw_df, df, width):
    # 3. do some column operations. (actually theres more than just this operation)  
    seriesF =  raw_df[name].dropna()
    afterDropping_indices = seriesF.index.copy(deep=True) 
    list_ = list(raw_df[name])[width:]  
    df[name]=pd.Series(list_.copy(), index=afterDropping_indices[width:])  
    df[name].name = name
    return df

def expandCall(kargs): 
    # Expand the arguments of a callback function, kargs[’func’] 
    func=kargs['func'] 
    del kargs['func']  
    out=func(**kargs)  
    return out
 
def preprocess_columns(raw_df ): 
    df, width = pd.DataFrame(), 137
    args = {"df":df, "raw_df":raw_df, 'width': width }
     
    column_names = raw_df.columns
    jobs=[]
    for i in range(len(column_names)):
        job = {"func":func,"name":column_names[i]}
        job.update(args)
        jobs.append(job)
    
    pool = mp.Pool(len(column_names))
    task=jobs[0]['func'].__name__
    outputs= pool.imap_unordered(expandCall, jobs)
    
    out = [];  
    for i,out_ in enumerate(outputs,1):
        out.append(out_)  
    pool.close(); pool.join() # this is needed to prevent memory leaks return out
      
    # create df from dict and reindex
    df=pd.concat(out,axis=1)  
    df=df.reindex(df.index[::-1]) 
    print(df)
    return df 

if __name__=='__main__': 
    raw_df = pd.DataFrame({"A":[ 1.1 ]*100000, "B":[ 2.2 ]*100000, "C":[ 3.3 ]*100000}) 
    raw_df = preprocess_columns(raw_df )