Многопроцессорная функция не обновляет фрейм данных в python

#python #pandas #dataframe #python-multiprocessing

#python #pandas #фрейм данных #python-многопроцессорная обработка

Вопрос:

У меня есть фрейм данных с расстоянием, и я хочу проверить это расстояние и применить некоторые функции в зависимости от значений dstances, я пытаюсь сделать это с помощью многопроцессорной обработки, я создал фрейм данных с тем же размером, что и исходный фрейм данных, но он имеет все значения нулей следующим образом:

 df_final=pd.DataFrame(0, index=np.arange(len(df)), columns=df.columns.tolist())
 

затем я написал generate функцию, которая должна вызываться многопроцессорной обработкой следующим образом:

 def generate(df_plus_id):
  #df = df_plus_id[0]
  #df_final =  df_plus_id[1]
  print("in dunc")
  print(df_final['NN_x1'].values.tolist()[0])
  for i in Regions:
    for j in range(df.shape[0]):
    
      #for k in range(i "_Edges_Count"):
        edge_count=df[i "_Edges_Count"].iloc[j]
        new_edge_count=df[i "_Edges_Count"].iloc[j]
        #print("edge count is",str(edge_count))
        for k in range(1,edge_count 1):
          #print("k is",str(k))
          if (df[i "_D" str(k)].iloc[j]<=1.4):
            df_final[i "_D" str(k)].iloc[j]=df[i "_D" str(k)].iloc[j]
            print("in if")



            
          





          else:
            #print("in else")
            #print(df[i "_D" str(k)].iloc[j])
            #df_final[i "_D" str(k)].iloc[j]=0


            new_edge_count=k-1
            break

          
        for c in range(1,new_edge_count 1):
            #print("new edges count",str(new_edge_count))
            #print("c",str(c))
            #print("i",str(i))

            for feature in Features:
                #print("feature is",str(feature))
                #print("j is",str(j))
                df_final[i feature str(c)].iloc[j]=df[i feature str(c)].iloc[j]
                #print("loc is",str(df_final[[i feature str(c)]].iloc[j]))
            


            
        df_final[i "_Edges_Count"].iloc[j]=new_edge_count
        if df_plus_id[2] == 0:
         df_final.to_csv('temporary_saved_files/'   str(df_plus_id[2])   '.csv', index=False)
        else:
         df_final.to_csv('temporary_saved_files/'   str(df_plus_id[2])   '.csv', index=False, header=False)     
 

и код в main, который вызывает функцию генерации, является:

 import multiprocessing as mp
import math
number_of_CPU = "Max"
if  type(number_of_CPU) == int and number_of_CPU < mp.cpu_count() and number_of_CPU > 0 :
    processes = number_of_CPU
else:
    processes = mp.cpu_count()

# if number of processess > number of lines in df, make processses= number of lines.
if processes > df.shape[0]:
    processes = df.shape[0]
print('Generating the new training data using %s'%str(processes)  ' processes.')
# Determine chunk size, which is the number of lines per processes
list_of_dfs = []
chunk_size = int(math.ceil(df.shape[0] / processes))
start = 0
end = chunk_size

# Divide the dataframe into chunks
for i in range(processes):

    if start >= df.shape[0]:
        # The ceiling could make a special case where the number of chunks < number of processses
        # Which will produce empty dataframe per process.
        # Ex 4 processes and 6 rows.
        break
    list_of_dfs.append([df.iloc[start:end, :],df_final.iloc[start:end, :],i])
    start = end
    end = end   chunk_size


processes = len(list_of_dfs)
# Make temporary directory for intermediate results

os.mkdir('temporary_saved_files')

with mp.Pool(processes) as pool:  # or whatever your hardware can support
    pool.map(generate, list_of_dfs)
# command = 'cat temporary_saved_files/*.csv  >' saving_directory
# print(command)
os.popen('cat temporary_saved_files/*.csv  >'   './sample.csv').read()
print("done concat")
os.popen('rm -r temporary_saved_files').read()

 

Однако df_final по-прежнему содержит все нули даже после вызова функции генерации, я не знаю, почему? несмотря на то, что, когда я применяю строки в функции генерации последовательно без многопроцессорной обработки, df_final имеет ожидаемые значения. Есть идеи, почему функция generate не записывает в df_final?

Комментарии:

1. Я не видел кода, который может передаваться df_final другим процессам. Так что, я думаю, вы этого не делали. Когда вы используете multiprocess , у вас много процессов. Они не передают никаких данных, если вы не сделаете что-то, чтобы помочь. Они просто изменяют свои собственные df_final , поэтому df_final основной процесс никогда не изменяется.

2. спасибо за ваш комментарий, на самом деле в моем реальном коде я пытался написать df_final, я записываю df_final каждого процесса в файл csv во временной папке, а затем объединяю их в конце. Тем не менее, у меня все еще есть нули в конечном объединенном файле csv

3. Вы пробовали использовать Dask ?

4. Нет, но почему? Раньше я делал это с помощью mp, но я не знаю, почему на этот раз он не работает?