#python #python-multiprocessing
#python #python-многопроцессорная обработка
Вопрос:
Есть ли способ передать вложенный словарь в многопроцессорную обработку?
d = {'a': {'x': 1, 'y':100},
'b': {'x': 2, 'y':200}}
Я надеялся запустить два параллельных задания, одно для {'a': {'x':1, 'y':100}}
, а другое для {'b': {'x': 2, 'y':200}}
, и использовать следующую функцию для создания нового словаря
def f(d):
key = dd.keys()
new_d[key]['x'] = d[key]['x']*2
new_d[key]['y'] = d[key]['y']*2
Это была моя неудачная попытка
import multiprocessing
def f(key, d, container):
container[key]['x'] = d[key]['x']*2
container[key]['y'] = d[key]['y']*2
if __name__ == '__main__':
manager = multiprocessing.Manager()
container = manager.dict()
d = manager.dict()
d['a'] = {'x': 1, 'y':100}
d['b'] = {'x': 2, 'y':200}
p1 = multiprocessing.Process(target=f, args=('a',d, container))
p2 = multiprocessing.Process(target=f, args=('b',d, container))
p1.start()
p2.start()
p1.join()
p2.join()
Я получаю KeyError: 'b'
, а также, я хотел бы избежать необходимости указывать количество процессов вручную, например p1
, и p2
и так далее. Может быть, есть другой способ?
Комментарии:
1. попробуйте
container[key] = {}
перед другой строкой 2, без которой вы не сможете назначить 2-й уровень2. @azro, ваше решение не даст мне никаких ошибок, но
print(container)
приведет к{'a': {}, 'b': {}}
3. @azro, да, я это сделал, я добавил
container[key] = {}
непосредственно перед двумя другими строкамиf
. Работает ли это с вами?4. Независимо от правильного ответа на ваш непосредственный вопрос. Создание общего словаря и его модификация несколькими потоками — почти наверняка плохая идея. Один поток фактически владеет объектом, а другие управляют им через прокси.
Ответ №1:
Вложенные dicts также должны управляться. Я добавил этот шаг в ваш код, а также сделал все зависящим от членов d
, поэтому вам не нужно иметь дело с p1
, p2
, и т. Д:
import multiprocessing
def f(key, d, container):
container[key]['x'] = d[key]['x']*2
container[key]['y'] = d[key]['y']*2
if __name__ == '__main__':
manager = multiprocessing.Manager()
container = manager.dict()
d = manager.dict()
d['a'] = {'x': 1, 'y':100}
d['b'] = {'x': 2, 'y':200}
# This line initialises the nested dicts
for key in d:
container[key] = manager.dict()
# Here we create a list with the processes we started
processes = []
for key in d:
p = multiprocessing.Process(target=f, args=(key ,d, container))
p.start()
processes.append(p)
# And finally wait for all of them to finish
for p in processes:
p.join()
# Show the results
print(container['a'])
print(container['b'])
multiprocessing.Pool
Класс может быть лучшим решением вашей проблемы, хотя (проверьте документы)
Ответ №2:
@nonDucor прав: вы должны создавать вложенные словари, используя Manager
объект.
Ниже приведено сокращенное решение, использующее более Pythonic создание словаря, а также использование ProcessPoolExecutor
интерфейса для параллелизма:
from concurrent.futures import ProcessPoolExecutor as Executor
import multiprocessing
def f(key, d, container):
container[key]['x'] = d[key]['x'] * 2
container[key]['y'] = d[key]['y'] * 2
if __name__ == '__main__':
manager = multiprocessing.Manager()
d = manager.dict({
'a': manager.dict({'x': 1, 'y': 100}),
'b': manager.dict({'x': 2, 'y': 200}),
})
container = manager.dict({x: manager.dict() for x in d.keys()})
with Executor() as exe:
exe.submit(f, 'a', d, container)
exe.submit(f, 'b', d, container)
for the_dict in (d, container):
print([the_dict[x].items() for x in the_dict.keys()])
Для сравнения, ниже мы используем многопоточность вместо многопроцессорной обработки. Поскольку память используется обоими потоками, нет необходимости в защищенных словарях — простые старые dicts работают просто отлично. Но чтобы сделать целевой словарь более динамичным и независимым от источника dict
при создании, мы используем defaultdict
defaultdict
структуру данных of s.:
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor as Executor
def f(key, d, container):
container[key]['x'] = d[key]['x'] * 2
container[key]['y'] = d[key]['y'] * 2
if __name__ == '__main__':
d ={
'a': {'x': 1, 'y': 100},
'b': {'x': 2, 'y': 200},
}
container = defaultdict(lambda: defaultdict(int))
with Executor() as exe:
exe.submit(f, 'a', d, container)
exe.submit(f, 'b', d, container)
for the_dict in (d, container):
print([the_dict[x].items() for x in the_dict.keys()])