Объединение выходных данных многопроцессорных рабочих — python2

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

У меня есть [абстрагированный] сценарий многопроцессорной обработки ниже, где я пытаюсь:

1) разделить рабочую нагрузку между двумя процессами (добавить 1 к каждой переменной в списке добавить новую переменную в новый список)

2) Объедините выходные данные обоих процессов в новый глобальный список для дальнейшей обработки.

Есть идеи о том, как я могу использовать выходные данные обоих процессов и объединить эти выходные данные в глобальный список? То, что я хочу получить после выполнения, это:

список новых идентификаторов = [2, 4, 6, 8, 10, 3, 5, 7, 9, 11] # new_id_list от worker1 new_id_list от worker2

 #python2
from multiprocessing import Process, Queue
from time import sleep

#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]

#new output id list
new_id_list = []

queue = Queue()

def mp_worker(queue):

    while queue.qsize() >0 :
        record = queue.get()
        new_id_list.append(record 1)
        sleep(.1)
    print(new_id_list)
    ###how would I go about passing this new_id_list as the global variable
    print("worker closed")

def mp_handler():

    # Spawn two processes, assigning the method to be executed 
    # and the input arguments (the queue)
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(2)]

    for process in processes:
        process.start()
        print('Process started')

    for process in processes:
        process.join()



if __name__ == '__main__':

    for id in id_list:
        queue.put(id)
    mp_handler()
  

Ответ №1:

Я предполагаю, что проблема, с которой вы столкнулись, заключалась в невозможности совместного использования обоих процессов new_id_list .

Что вам нужно сделать, так это создать другой, Queue который будет представлять очередь результатов и передавать ее обоим процессам. Добавляйте в очередь по мере необходимости внутри процессов, и в конце выполнения обоих процессов (после process.join() ) вы просто извлекаете все из очереди в список.

Комментарии:

1. в итоге я сделал что-то подобное, используя многопроцессорную обработку. Manager(), затем создание списка с помощью менеджера.

Ответ №2:

Нашел здесь статью о решении.

Рабочий код ниже. В основном:

1) мы используем многопроцессорную обработку.Менеджер()

2) сгенерируйте список с помощью Manager

3) Передайте список каждому рабочему, затем попросите каждого рабочего добавить выходные данные обратно в список.

 from multiprocessing import Process, Queue
from time import sleep
import multiprocessing

#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]

#new output id list
new_id_list = []

queue = Queue()

def mp_worker(d,queue):

    while queue.qsize() >0 :
        record = queue.get()
        new_id_list.append(record 1)
        d.append(record 1)
        sleep(.1)
    print(new_id_list)  
    print("worker closed")

def mp_handler():

    # Spawn two processes, assigning the method to be executed 
    # and the input arguments (the queue)
    processes = [Process(target=mp_worker, args=(d,queue,)) for _ in range(2)]

    for process in processes:
        process.start()
        print('Process started')

    for process in processes:
        process.join()



if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.list()
    for id in id_list:
        queue.put(id)
    mp_handler()
    print(d)  #