#python #pandas #dataframe #multiprocessing
#python #pandas #фрейм данных #многопроцессорная обработка
Вопрос:
Я пытаюсь выполнить несколько операций с фреймом данных; один из них — проверить значение одного столбца, если оно больше 3, это значение превращается в ноль. Код, который я написал, работал нормально последовательно, и результат был таким, как ожидалось. Однако, когда я попытался парализовать его, используя многопроцессорную библиотеку, результат был не таким, как ожидалось. После некоторых размышлений я обнаружил, что обрабатывается только последняя строка каждого файла csv (фрагмента) в temporary_saved_files2
(перед их объединением и удалением папки, как это будет показано в коде), однако все строки остаются нулями. это код:
df_final=pd.DataFrame(0, index=np.arange(len(df)), columns=df.columns.tolist())
#df_final=pd.DataFrame()
def generate(df_plus_id):
df = df_plus_id[0]
df_final = df_plus_id[1]
## my operationsss and processing
if df_plus_id[2] == 0:
df_final.to_csv('temporary_saved_files2/' str(df_plus_id[2]) '.csv', index=False)
else:
df_final.to_csv('temporary_saved_files2/' str(df_plus_id[2]) '.csv', index=False, header=False)
и вот код, который вызывает функцию генерации:
import multiprocessing as mp
number_of_CPU = "Max"
if type(number_of_CPU) == int and number_of_CPU < mp.cpu_count() and number_of_CPU > 0 :
processes = number_of_CPU
else:
processes = mp.cpu_count()
# if number of processess > number of lines in df, make processses= number of lines.
if processes > df.shape[0]:
processes = df.shape[0]
print('Generating the new training data using %s'%str(processes) ' processes.')
# Determine chunk size, which is the number of lines per processes
list_of_dfs = []
chunk_size = int(math.ceil(df.shape[0] / processes))
start = 0
end = chunk_size
# Divide the dataframe into chunks
for i in range(processes):
if start >= df.shape[0]:
break
list_of_dfs.append([df.iloc[start:end, :],df_final.iloc[start:end, :],i])
start = end
end = end chunk_size
processes = len(list_of_dfs)
# Make temporary directory for intermediate results
os.mkdir('temporary_saved_files2')
with mp.Pool(processes) as pool: # or whatever your hardware can support
pool.map(generate, list_of_dfs)
os.popen('cat temporary_saved_files2/*.csv >' 'trial').read()
os.popen('rm -r temporary_saved_files2').read()
Есть идеи, почему это так?