многопроцессорная обработка с вложенным словарем

#python #python-multiprocessing

#python #python-многопроцессорная обработка

Вопрос:

Есть ли способ передать вложенный словарь в многопроцессорную обработку?

 d = {'a': {'x': 1, 'y':100},
     'b': {'x': 2, 'y':200}}
 

Я надеялся запустить два параллельных задания, одно для {'a': {'x':1, 'y':100}} , а другое для {'b': {'x': 2, 'y':200}} , и использовать следующую функцию для создания нового словаря

 def f(d):
    key = dd.keys()
    new_d[key]['x'] = d[key]['x']*2
    new_d[key]['y'] = d[key]['y']*2
 

Это была моя неудачная попытка

 import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x']*2
    container[key]['y'] = d[key]['y']*2
    
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    container = manager.dict()
    d = manager.dict()
    
    d['a'] = {'x': 1, 'y':100}
    d['b'] = {'x': 2, 'y':200}
        
    p1 = multiprocessing.Process(target=f, args=('a',d, container))
    p2 = multiprocessing.Process(target=f, args=('b',d, container))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()
 

Я получаю KeyError: 'b' , а также, я хотел бы избежать необходимости указывать количество процессов вручную, например p1 , и p2 и так далее. Может быть, есть другой способ?

Комментарии:

1. попробуйте container[key] = {} перед другой строкой 2, без которой вы не сможете назначить 2-й уровень

2. @azro, ваше решение не даст мне никаких ошибок, но print(container) приведет к {'a': {}, 'b': {}}

3. @azro, да, я это сделал, я добавил container[key] = {} непосредственно перед двумя другими строками f . Работает ли это с вами?

4. Независимо от правильного ответа на ваш непосредственный вопрос. Создание общего словаря и его модификация несколькими потоками — почти наверняка плохая идея. Один поток фактически владеет объектом, а другие управляют им через прокси.

Ответ №1:

Вложенные dicts также должны управляться. Я добавил этот шаг в ваш код, а также сделал все зависящим от членов d , поэтому вам не нужно иметь дело с p1 , p2 , и т. Д:

 import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x']*2
    container[key]['y'] = d[key]['y']*2

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    container = manager.dict()
    d = manager.dict()

    d['a'] = {'x': 1, 'y':100}
    d['b'] = {'x': 2, 'y':200}

    # This line initialises the nested dicts
    for key in d:
        container[key] = manager.dict()

    # Here we create a list with the processes we started
    processes = []
    for key in d:
        p = multiprocessing.Process(target=f, args=(key ,d, container))
        p.start()
        processes.append(p)

    # And finally wait for all of them to finish
    for p in processes:
        p.join()

    # Show the results
    print(container['a'])
    print(container['b'])
 

multiprocessing.Pool Класс может быть лучшим решением вашей проблемы, хотя (проверьте документы)

Ответ №2:

@nonDucor прав: вы должны создавать вложенные словари, используя Manager объект.

Ниже приведено сокращенное решение, использующее более Pythonic создание словаря, а также использование ProcessPoolExecutor интерфейса для параллелизма:

 from concurrent.futures import ProcessPoolExecutor as Executor
import multiprocessing

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    d = manager.dict({
        'a': manager.dict({'x': 1, 'y': 100}),
        'b': manager.dict({'x': 2, 'y': 200}),
    })
    container = manager.dict({x: manager.dict() for x in d.keys()})
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])
 

Для сравнения, ниже мы используем многопоточность вместо многопроцессорной обработки. Поскольку память используется обоими потоками, нет необходимости в защищенных словарях — простые старые dicts работают просто отлично. Но чтобы сделать целевой словарь более динамичным и независимым от источника dict при создании, мы используем defaultdict defaultdict структуру данных of s.:

 from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor as Executor

def f(key, d, container):
    container[key]['x'] = d[key]['x'] * 2
    container[key]['y'] = d[key]['y'] * 2

if __name__ == '__main__':
    d ={
        'a': {'x': 1, 'y': 100},
        'b': {'x': 2, 'y': 200},
    }
    container = defaultdict(lambda: defaultdict(int))
    with Executor() as exe:
        exe.submit(f, 'a', d, container)
        exe.submit(f, 'b', d, container)
        
    for the_dict in (d, container):
        print([the_dict[x].items() for x in the_dict.keys()])