многопроцессорная запись csv записывает только в последний индекс каждого фрагмента строк

#python #pandas #dataframe #multiprocessing

#python #pandas #фрейм данных #многопроцессорная обработка

Вопрос:

Я пытаюсь выполнить несколько операций с фреймом данных; один из них — проверить значение одного столбца, если оно больше 3, это значение превращается в ноль. Код, который я написал, работал нормально последовательно, и результат был таким, как ожидалось. Однако, когда я попытался парализовать его, используя многопроцессорную библиотеку, результат был не таким, как ожидалось. После некоторых размышлений я обнаружил, что обрабатывается только последняя строка каждого файла csv (фрагмента) в temporary_saved_files2 (перед их объединением и удалением папки, как это будет показано в коде), однако все строки остаются нулями. это код:

 df_final=pd.DataFrame(0, index=np.arange(len(df)), columns=df.columns.tolist())
#df_final=pd.DataFrame()
def generate(df_plus_id):
    df = df_plus_id[0]
    df_final =  df_plus_id[1]
    ## my operationsss and processing

   if df_plus_id[2] == 0:
        df_final.to_csv('temporary_saved_files2/'   str(df_plus_id[2])   '.csv', index=False)
    else:
        df_final.to_csv('temporary_saved_files2/'   str(df_plus_id[2])   '.csv', index=False, header=False)
 

и вот код, который вызывает функцию генерации:

 import multiprocessing as mp

number_of_CPU = "Max"
if  type(number_of_CPU) == int and number_of_CPU < mp.cpu_count() and number_of_CPU > 0 :
    processes = number_of_CPU
else:
    processes = mp.cpu_count()

# if number of processess > number of lines in df, make processses= number of lines.
if processes > df.shape[0]:
    processes = df.shape[0]
print('Generating the new training data using %s'%str(processes)  ' processes.')
# Determine chunk size, which is the number of lines per processes
list_of_dfs = []
chunk_size = int(math.ceil(df.shape[0] / processes))
start = 0
end = chunk_size

# Divide the dataframe into chunks
for i in range(processes):

    if start >= df.shape[0]:
                break
    list_of_dfs.append([df.iloc[start:end, :],df_final.iloc[start:end, :],i])
    start = end
    end = end   chunk_size


processes = len(list_of_dfs)
# Make temporary directory for intermediate results
os.mkdir('temporary_saved_files2')

with mp.Pool(processes) as pool:  # or whatever your hardware can support
    pool.map(generate, list_of_dfs)
os.popen('cat temporary_saved_files2/*.csv  >'   'trial').read()
os.popen('rm -r temporary_saved_files2').read()

 

Есть идеи, почему это так?