Общие объекты в общих объектах с многопроцессорной обработкой

#python #multiprocessing #shared-memory #python-multiprocessing

#python #многопроцессорная обработка #разделяемая память #python-многопроцессорная обработка

Вопрос:

Я сталкиваюсь с очень постоянной проблемой: я хотел бы совместно использовать сложные объекты между процессами, используя доступные для совместного использования типы, предложенные в multiprocessing (например dict , list , и т. Д. которые обрабатываются прокси SyncManager -серверами ). Я даже реализовал несколько других распространенных типов, таких как deque и set . Все работает, пока я храню простые значения в этих объектах (числа с плавающей запятой, целые числа и т. Д.).

Например, если я использую следующее, это работает идеально, как и ожидалось:

 import multiprocessing, time

manager = multiprocessing.Manager()
d       = manager.list()
lock    = manager.Lock()

def reader(d, lock):
    for i in range(5):
        with lock:
            print(d)
            sys.stdout.flush()
        time.sleep(0.5)

def writer(d, lock):
    for i in range(5):
        with lock:
            d.append(i)
        time.sleep(0.5)

# Try to read and write the deque:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
  

Как и ожидалось, это выводит обновленный список на каждой итерации считывателя.

Проблема возникает, когда я хочу сохранить объекты в этих объектах. Допустим, словарь списков. Очевидно, что если я сохраню простые объекты в dict, совместимом с процессом, это не поможет. Итак, я попробовал следующее:

 import multiprocessing, time

manager = multiprocessing.Manager()
d       = manager.dict()
lock    = manager.Lock()

# Add lists:
for i in range(5):
    d[i] = manager.list()

def reader(d, lock):
    for i in range(10):
        with lock:
            print(d)
            sys.stdout.flush()
        time.sleep(1)

def writer(d, lock):
    for i in range(10):
        with lock:
            for j in range(5):
                d[j].append(i)
        time.sleep(1)

# Try to read and write the dict:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
  

К сожалению, с этим последним фрагментом кода обновленные списки не распределяются между процессами. Я думал, что использование прокси-серверов сделает это, но это не так.

Единственное решение, которое я нашел, — зафиксировать изменения, внесенные в список, путем явного переназначения ключа в словаре. Очевидно, что если я это сделаю, мне больше не нужно использовать прокси в списках, поскольку это требует ненужных затрат. Итак, решение, которое я нашел до сих пор, для замены

 d[j].append(i)
  

в записи с помощью:

 l = d[j]     # Somehow creates a local copy
l.append(i)  # Modify the copy
d[j] = l     # Reassign to commit the change
  

Хотя это выполняет свою работу, это не так тривиально для записи / понимания и может быть подвержено ошибкам (забывание фиксировать данные и т. Д.).

Я что-то здесь упускаю? Есть ли способ получить общие объекты внутри общих объектов?

Просто для полноты картины, вот код, который у меня есть в конце, и он делает то, что я делаю, но не так, как я хотел бы это сделать, очевидно:

 import multiprocessing, time

manager = multiprocessing.Manager()
d       = manager.dict()
lock    = manager.Lock()
# Add lists:
for i in range(5):
    d[i]    = [] # manager.list() doesn't change anything               

def reader(d, lock):
    for i in range(10):
        with lock:
            print(d)
            sys.stdout.flush()
        time.sleep(1)

def writer(d, lock):
    for i in range(10):
        with lock:
            for j in range(5):
                l = d[j]
                l.append(i)
                d[j] = l
        time.sleep(1)

# Try to read and write the dict:
r = multiprocessing.Process(target=reader, args=(d, lock))
w = multiprocessing.Process(target=writer, args=(d, lock))
r.start()
w.start()
r.join()
w.join()
  

Ответ №1:

Действительно, это невозможно сделать напрямую, поскольку прокси-объект не имеет возможности изменить внутреннюю переменную, поскольку он просто указывает на ячейку памяти и сохраняет ссылку на эту ячейку памяти вместо фактических значений при использовании пользовательских или более продвинутых объектов.

В документации об этом говорится следующее:

Примечание

Изменения изменяемых значений или элементов в прокси-серверах dict и list не будут распространяться через диспетчер, поскольку прокси-сервер не может узнать, когда изменяются его значения или элементы. Чтобы изменить такой элемент, вы можете повторно назначить измененный объект прокси-серверу контейнера:

 # create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
  

Cfr:
http://www.cmi.ac.in/~madhavan/courses/prog2-2015/docs/python-3.4.2-docs-html/library/multiprocessing.html#multiprocessing.managers.SyncManager.list

Ответ №2:

В Python 3.8 есть общие списки. Они не являются полнофункциональными (во-первых, вы не можете изменить их длину), но у них есть другие характеристики списков, включая поддержку сочетания типов. Смотрите класс ShareableList .