Как правильно разделить команду менеджера между процессами

#python #dictionary #python-multiprocessing #multiprocessing-manager

#python #словарь #python-многопроцессорность #многопроцессорный менеджер

Вопрос:

Что я хотел бы сделать, так это разделить словарь между подклассами Process , и когда один процесс обновляет словарь, другой получает уведомление об его использовании. Это проиллюстрировано в приведенном ниже коде, где MyProducer начинается заполнение словаря и на каждой итерации запускается событие для уведомления MyConsumer о необходимости обработки словаря. Все работает, кроме той части, где словарь в MyConsumer пуст…

 from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.dictionary[self.increment]=self.increment 10
            self.increment = self.increment   1
            print("From producer: ", self.dictionary)
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment   1
        
class MyConsumer(Process):
    def __init__(self, dictionary, event):
        Process.__init__(self)
        self.dictionary = dictionary
        self.event = event
        
    
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.dictionary)
            self.event.clear()
            

            
if __name__ == "__main__":

    with Manager() as manager:
        state_dict = manager.dict()
        state_ready = Event()
        producerprocess = MyProducer(state_dict, state_ready)
        consumerprocess = MyConsumer(state_dict, state_ready)
        producerprocess.start()
        consumerprocess.start()    
  

Результат таков

 Process MyProducer-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "main.py", line 13, in run
    self.dictionary[self.increment]=self.increment 10
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
    self._connect()
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 630, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

  

Обновить

Мое намерение состоит в том, чтобы понять, почему словарь не работает с подклассами процессов. Я знаю все работающие случаи, которые вы можете найти в Интернете. На самом деле у меня есть решение, которое отлично работает, просто замените dict на queue, я хочу понять, почему dict не работает.

 from multiprocessing import Process, Queue, Event

class MyProducer(Process):
    increment = 0
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
    
    def run(self):
        while self.increment < 20:
            self.queue.put([self.increment,self.increment 10])
            self.increment = self.increment   1
            print("From producer: ", self.queue.qsize())
            self.event.set()
            while self.event.is_set() is True:
                increment = self.increment
                increment = increment   1
        
class MyConsumer(Process):
    def __init__(self, queue, event):
        Process.__init__(self)
        self.queue = queue
        self.event = event
        
    def run(self):
        while True:
            self.event.wait()
            print("From consumer: ", self.queue.qsize())
            self.event.clear()
            

if __name__ == "__main__":
  state_queue = Queue()
  state_ready = Event()
  producerprocess = MyProducer(state_queue, state_ready)
  consumerprocess = MyConsumer(state_queue, state_ready)
  producerprocess.start()
  consumerprocess.start()  

  

Ответ №1:

К вашему сведению, я вижу почти такую же смерть с этой более простой программой:

 from multiprocessing import Process, Manager, Event

class MyProducer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        print("at producer start", self.val.value)
        self.val.value = 42
        self.event.set()

class MyConsumer(Process):
    def __init__(self, value, event):
        Process.__init__(self)
        self.val = value
        self.event = event

    def run(self):
        self.event.wait()
        print("From consumer: ", self.val.value)
                        
if __name__ == "__main__":
    with Manager() as manager:
        state_value = manager.Value('i', 666)
        state_ready = Event()
        producerprocess = MyProducer(state_value, state_ready)
        consumerprocess = MyConsumer(state_value, state_ready)
        producerprocess.start()
        consumerprocess.start()
  

Подразумевается, что никакой объект, полученный из Manager , не восстанавливается с пользой, когда он прикреплен в качестве атрибута к объекту, который mp должен создать «по волшебству» в рабочем процессе. Информация, необходимая для подключения к Manager серверному процессу, по-видимому, теряется (будь то сокет в системах Linux-y или именованный канал в Windows).

Вы могли бы отправить отчет об ошибке, но до этого с этим ничего нельзя сделать, кроме как переписать код, чтобы не использовать Manager или явно передавать объекты Manager функциям.

Отчет об ошибке может иметь два вида разрешений: (1) заставить его «работать»; или, (2) код изменен, чтобы вызвать исключение при попытке создать такой объект.

Еще одна возможность (неопробованная): если вы работаете только в Linux, вы можете пропустить __name__ == "__main__" тест и надеяться, что Manager информация о соединении сохранится fork() .

Редактировать

Я открыл проблему на трекере проекта Python, здесь:

https://bugs.python.org/issue41660

ОБХОДНОЙ ПУТЬ

Играя с материалом в отчете о проблеме Python, «проблема» здесь, похоже, связана не с тем, как все настраивается, а с вашим кодом, игнорирующим необходимость чистого отключения рабочих. Просто добавьте эту строку в конец вашего кода ( dict версия — та, о которой вы заботитесь):

     producerprocess.join()
  

достаточно, чтобы на моем компьютере прямо сейчас (Win 10 Python 3.8.5) он выдавал ожидаемый результат. Однако тогда он зависает навсегда, потому что ваш потребитель .wait() навсегда для Event , который больше никогда не устанавливается.

Мое предположение (в правильности которого я уверен на 80%): без .join() основной процесс переходит к запуску кода завершения работы интерпретатора (ему больше нечего делать!), И это начинает принудительно уничтожать материал, который multiprocessing реализация все еще должна корректно функционировать.

С .join() , основной процесс блокируется до тех пор, пока производитель не будет завершен — код завершения работы не запускается на время и .join() явно инструктирует процесс-производитель завершить свою часть (тщательно продуманного!) multiprocessing танца чисто.

Это все еще может привести к повреждению процесса-потребителя, но, если это так, мы никогда не увидим доказательств этого, потому что потребитель навсегда заблокирован на своем self.event.wait() .

В реальной программе вы должны сделать все возможное, чтобы также полностью завершить процесс потребителя.

ПОЛНЫЙ КОД

Вот полная программа, демонстрирующая идиоматический Python и лучшие практики параллельного программирования: все завершается чисто, никаких «циклов занятости», никаких гонок, никаких взаимоблокировок. Реализация State более сложна, чем требует эта конкретная проблема, но иллюстрирует мощный подход, который стоит изучить.

 import multiprocessing as mp

P, C, F = 1, 2, 4 # bit flags for state values

# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation, it must not make another
# before the consumer acts on it.  So we'll say we're in state
# P when the producer is allowed to mutate, and in state C
# when there's a mutation for the consumer to process.  Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
    def __init__(self):
        # Initial state is empty - everyone is blocked.
        # Note that we do our own locking around the shared
        # memory, via the condition variable's mutex, so
        # it would be pure waste for the Value to have
        # its own lock too.
        self.state = mp.Value('B', 0, lock=False)
        self.changed = mp.Condition()

    # Wait for state to change to one of the states in the
    # flag mask `what`.  Return the bit flag of the state
    # that succeeded.
    def waitfor(self, what):
        with self.changed:
            while not (self.state.value amp; what):
                self.changed.wait()
            return self.state.value

    # Force state to (bit flag) `what`, and notify waiters
    # to wake up and see whether it's the state they're
    # waiting for.
    def setwhat(self, what):
        with self.changed:
            self.state.value = what
            self.changed.notify_all()

class Base(mp.Process):
    def __init__(self, dictionary, state):
        super().__init__()
        self.dictionary = dictionary
        self.state = state

class MyProducer(Base):
    def __init__(self, *args):
        super().__init__(*args)
        self.increment = 0

    def run(self):
        while self.increment < 20:
            self.state.waitfor(P)
            self.dictionary[self.increment] = self.increment   10
            self.state.setwhat(C)
            # Whether the producer or the consumer prints the dict
            # first isn't forced - and, indeed, they can both print at
            # the same time, producing garbled output.  Move the
            # print() above the setwhat(C) to force the producer
            # to print first, if desired.
            print("From producer: ", self.dictionary)
            self.increment  = 1

class MyConsumer(Base):
    def run(self):
        while self.state.waitfor(C | F) != F:
            print("From consumer: ", self.dictionary)
            self.state.setwhat(P)

def main():
    with mp.Manager() as manager:
        state_dict = manager.dict()
        state_state = State()
        producerprocess = MyProducer(state_dict, state_state)
        consumerprocess = MyConsumer(state_dict, state_state)
        producerprocess.start()
        consumerprocess.start()

        # The producer is blocked waiting for state P, and the
        # consumer is blocked waiting for state C (or F). The
        # loop here counts down 5 seconds, so you can verify
        # by eyeball that the waits aren't "busy" (they consume
        # essentially no CPU cycles).
        import time
        for i in reversed(range(5)):
            time.sleep(1)
            print(i)

        state_state.setwhat(P) # tell the producer to start!
        producerprocess.join() # and wait for it to finish
        # wait for the consumer to finish eating the last mutation
        state_state.waitfor(P)
        # tell the consumer we're all done
        state_state.setwhat(F)
        consumerprocess.join()    

if __name__ == "__main__":
    main()