Сохранение выходных данных в переменную между многопроцессорными процессами

#python #parallel-processing #multiprocessing

Вопрос:

Я ищу простое решение, которое сохраняет выходные данные в один файл или в одну переменную, когда я использую многопроцессорную обработку.

мой вывод-это список, когда я пытаюсь объединить все в конце моей многопроцессорной обработки, возникает ошибка

Другой вариант-сохранять файл на каждом шаге функции вывода, но это занимает много памяти

Есть ли способ сохранить этот список между многопроцессорными процессами?

 def calculate(data,ylat,xlon):
    output = []
    for j,i in data:
       ...
       output.append(lat,lon,area,fraction_area)
    L.append(output)
    print(lat,lon,area,fraction_area,file=f)
return output

# Multiprocessing
# number of polygons are 1200000
f=open('name.txt','w')
with mp.Manager() as manager:
   L = manager.list()
   pool = mp.Pool() 
   for index,polys in area_study.iterrows():
      # Limits each polygon in shapefile     
      ylat = [ymin,ymax]
      xlon= [xmin,xmax]
      args.append((polys,ylat,xlon))

   p=pool.starmap(calculate,args) 
   pool.close()
   pool.join()  

 

Комментарии:

1. Если вы хотите поделиться файлом/переменной между потоками, я считаю, что вы должны использовать семафоры , чтобы разрешить одному потоку доступ к переменной/файлу, заблокировать его, а затем освободить, чтобы другие потоки могли получить доступ

2. попробуйте использовать генераторы!!

3. Где именно я могу использовать семафоры?? .. как я могу использовать генераторы??

4.Почему вы добавляете output в управляемый список, когда возвращаемое значение pool.starmap будет обычным list с тем же содержимым (при условии, что вы исправите ошибку отступа). Если

Ответ №1:

См.Мой комментарий выше о добавлении в управляемый список, которое кажется ненужной операцией.

Я бы использовал функцию генератора, которая выдает последовательные (polys, ylat, xlon) кортежи в сочетании с Pool.imap методом (и эффективным аргументом размера фрагмента), который может лениво генерировать все аргументы по ходу работы и обрабатывать возвращаемые значения при compute записи результатов основного процесса по мере их поступления. Это общая идея, которую вы можете адаптировать по мере необходимости.

Обратите внимание, что в вашей функции calculate этот оператор output.append(lat,lon,area,fraction_area) недопустим, так append как принимает только один аргумент. Я предполагаю, что в приведенном ниже коде вы пытаетесь добавить кортеж (или список):

 import multiprocessing as mp

def calculate(t):
    data, ylat, xlon = t # unpack
    output = []
    for j, i in data:
       ...
       output.append((lat, lon, area, fraction_area))
    return output

def compute_chunksize(poolsize, iterable_size): 
    chunksize, remainder = divmod(iterable_size, 4 * poolsize)
    if remainder:
        chunksize  = 1
    return chunksize

def generate_polygons():
    for index, polys in area_study.iterrows():
       # Limits each polygon in shapefile     
       ylat = [ymin,ymax]
       xlon = [xmin,xmax]
       yield polys, ylat, xlon

# Required for Windows
if __name__ == '__main__':
    # number of polygons is 1200000
    poolsize = mp.cpu_count()
    # Best guess as to the size of the iterable being passed to imap:
    iterable_size = 1_200_000
    chunksize = compute_chunksize(poolsize, iterable_size)
    with open('name.txt', w) as f:
        pool = mp.Pool(poolsize)
        # Each result is a list of tuples of lat, lon, area, fraction_area
        for result in pool.imap(calculate, generate_polygons(), chunksize=chunksize):
            for t in result:
                print(*t, file=f)
                """
                lat, lon, area, fraction_area = t # unpack
                print(lat, lon, area, fraction_area, file=f)
                """
        pool.close()
        pool.join()