#python #pandas #dataframe #python-multiprocessing
#python #pandas #фрейм данных #python-многопроцессорная обработка
Вопрос:
У меня есть фрейм данных с расстоянием, и я хочу проверить это расстояние и применить некоторые функции в зависимости от значений dstances, я пытаюсь сделать это с помощью многопроцессорной обработки, я создал фрейм данных с тем же размером, что и исходный фрейм данных, но он имеет все значения нулей следующим образом:
df_final=pd.DataFrame(0, index=np.arange(len(df)), columns=df.columns.tolist())
затем я написал generate
функцию, которая должна вызываться многопроцессорной обработкой следующим образом:
def generate(df_plus_id):
#df = df_plus_id[0]
#df_final = df_plus_id[1]
print("in dunc")
print(df_final['NN_x1'].values.tolist()[0])
for i in Regions:
for j in range(df.shape[0]):
#for k in range(i "_Edges_Count"):
edge_count=df[i "_Edges_Count"].iloc[j]
new_edge_count=df[i "_Edges_Count"].iloc[j]
#print("edge count is",str(edge_count))
for k in range(1,edge_count 1):
#print("k is",str(k))
if (df[i "_D" str(k)].iloc[j]<=1.4):
df_final[i "_D" str(k)].iloc[j]=df[i "_D" str(k)].iloc[j]
print("in if")
else:
#print("in else")
#print(df[i "_D" str(k)].iloc[j])
#df_final[i "_D" str(k)].iloc[j]=0
new_edge_count=k-1
break
for c in range(1,new_edge_count 1):
#print("new edges count",str(new_edge_count))
#print("c",str(c))
#print("i",str(i))
for feature in Features:
#print("feature is",str(feature))
#print("j is",str(j))
df_final[i feature str(c)].iloc[j]=df[i feature str(c)].iloc[j]
#print("loc is",str(df_final[[i feature str(c)]].iloc[j]))
df_final[i "_Edges_Count"].iloc[j]=new_edge_count
if df_plus_id[2] == 0:
df_final.to_csv('temporary_saved_files/' str(df_plus_id[2]) '.csv', index=False)
else:
df_final.to_csv('temporary_saved_files/' str(df_plus_id[2]) '.csv', index=False, header=False)
и код в main, который вызывает функцию генерации, является:
import multiprocessing as mp
import math
number_of_CPU = "Max"
if type(number_of_CPU) == int and number_of_CPU < mp.cpu_count() and number_of_CPU > 0 :
processes = number_of_CPU
else:
processes = mp.cpu_count()
# if number of processess > number of lines in df, make processses= number of lines.
if processes > df.shape[0]:
processes = df.shape[0]
print('Generating the new training data using %s'%str(processes) ' processes.')
# Determine chunk size, which is the number of lines per processes
list_of_dfs = []
chunk_size = int(math.ceil(df.shape[0] / processes))
start = 0
end = chunk_size
# Divide the dataframe into chunks
for i in range(processes):
if start >= df.shape[0]:
# The ceiling could make a special case where the number of chunks < number of processses
# Which will produce empty dataframe per process.
# Ex 4 processes and 6 rows.
break
list_of_dfs.append([df.iloc[start:end, :],df_final.iloc[start:end, :],i])
start = end
end = end chunk_size
processes = len(list_of_dfs)
# Make temporary directory for intermediate results
os.mkdir('temporary_saved_files')
with mp.Pool(processes) as pool: # or whatever your hardware can support
pool.map(generate, list_of_dfs)
# command = 'cat temporary_saved_files/*.csv >' saving_directory
# print(command)
os.popen('cat temporary_saved_files/*.csv >' './sample.csv').read()
print("done concat")
os.popen('rm -r temporary_saved_files').read()
Однако df_final по-прежнему содержит все нули даже после вызова функции генерации, я не знаю, почему? несмотря на то, что, когда я применяю строки в функции генерации последовательно без многопроцессорной обработки, df_final имеет ожидаемые значения. Есть идеи, почему функция generate не записывает в df_final?
Комментарии:
1. Я не видел кода, который может передаваться
df_final
другим процессам. Так что, я думаю, вы этого не делали. Когда вы используетеmultiprocess
, у вас много процессов. Они не передают никаких данных, если вы не сделаете что-то, чтобы помочь. Они просто изменяют свои собственныеdf_final
, поэтомуdf_final
основной процесс никогда не изменяется.2. спасибо за ваш комментарий, на самом деле в моем реальном коде я пытался написать df_final, я записываю df_final каждого процесса в файл csv во временной папке, а затем объединяю их в конце. Тем не менее, у меня все еще есть нули в конечном объединенном файле csv
3. Вы пробовали использовать Dask ?
4. Нет, но почему? Раньше я делал это с помощью mp, но я не знаю, почему на этот раз он не работает?