Использование общего списка с многопроцессорной обработкой pathos вызывает ошибку «отправленный дайджест был отклонен»

#python #multiprocessing #dill #pathos

#питон #многопроцессорная обработка #укроп #pathos

Вопрос:

Я пытаюсь использовать многопроцессорную обработку для создания сложных, не поддающихся отбору объектов в соответствии со следующим фрагментом кода:

 from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool

class Facility:

    def __init__(self):
        self.blocks = Manager().list()

    def __process_blocks(self, block):
        designer = block["designer"]
        apply_terrain = block["terrain"]
        block_type = self.__block_type_to_string(block["type"])
        block = designer.generate_block(block_id=block["id"],
                                            block_type=block_type,
                                            anchor=Point(float(block["anchor_x"]), float(block["anchor_y"]),
                                                         float(block["anchor_z"])),
                                            pcu_anchor=Point(float(block["pcu_x"]), float(block["pcu_y"]), 0),
                                            corridor_width=block["corridor"],
                                            jb_height=block["jb_connect_height"],
                                            min_boxes=block["min_boxes"],
                                            apply_terrain=apply_terrain)
        self.blocks.append(block)

    def design(self, apply_terrain=False):
        designer = FacilityBuilder(string_locator=self._string_locator, string_router=self._string_router,
                                   box_router=self._box_router, sorter=self._sorter,
                                   tracker_configurator=self._tracker_configurator, config=self._config)
        blocks = [block.to_dict() for index, block in self._store.get_blocks().iterrows()]
        for block in blocks:
            block["designer"] = designer
            block["terrain"] = apply_terrain

        with ProcessingPool() as pool:
            pool.map(self.__process_blocks, blocks)
 

(Изо всех сил пытаюсь воспроизвести это с помощью более простого кода, поэтому я показываю фактический код)

Мне нужно обновить разделяемую переменную, поэтому я инициализирую переменную уровня класса, используя a multiprocessing.Manager следующим образом:

 self.blocks = Manager().list()
 

Это оставляет меня со следующей ошибкой (только частичная трассировка стека):

   File "C:UsersPaul.NelDocumentsreposautoPV.autopvlibsite-packagesdill_dill.py", line 481, in load
    obj = StockUnpickler.load(self)
  File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 933, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 783, in __init__
    self._incref()
  File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingmanagers.py", line 837, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingconnection.py", line 513, in Client
    answer_challenge(c, authkey)
  File "C:UsersPaul.NelAppDataLocalProgramsPythonPython39libmultiprocessingconnection.py", line 764, in answer_challe
nge
    raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
 

В качестве последнего средства я попытался использовать python стандартную ThreadPool реализацию, чтобы попытаться обойти pickle проблему, но и это не прошло хорошо. Я читал о многих подобных проблемах, но не нашел решения этой конкретной проблемы. Проблема в том dill или в том, как pathos взаимодействует с mulitprocessing.Manager ?

РЕДАКТИРОВАТЬ: итак, мне удалось воспроизвести это с помощью примера кода следующим образом:

 import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool


class MyComplex:

    def __init__(self, x):
        self._z = x * x

    def me(self):
        return math.sqrt(self._z)


class Starter:

    def __init__(self):
        manager = Manager()
        self.my_list = manager.list()

    def _f(self, value):
        print(f"{value.me()} on {os.getpid()}")
        self.my_list.append(value.me)

    def start(self):
        names = [MyComplex(x) for x in range(100)]

        with ProcessingPool() as pool:
            pool.map(self._f, names)


if __name__ == '__main__':
    starter = Starter()
    starter.start()
 

Ошибка возникает при добавлении self.my_list = manager.list() .

Ответ №1:

Итак, я решил эту проблему. Я все равно был бы великолепен, если бы кто-то вроде mmckerns или кто-то еще, обладающий большими знаниями, чем я, в многопроцессорной обработке, мог прокомментировать, почему это решение.

Проблема, по-видимому, заключалась в том, что Manager().list() было объявлено в __init__ . Следующий код работает без каких-либо проблем:

 import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool


class MyComplex:

    def __init__(self, x):
        self._z = x * x

    def me(self):
        return math.sqrt(self._z)


class Starter:

    def _f(self, value):
        print(f"{value.me()} on {os.getpid()}")
        return value.me()

    def start(self):
        manager = Manager()
        my_list = manager.list()
        names = [MyComplex(x) for x in range(100)]

        with ProcessingPool() as pool:
            my_list.append(pool.map(self._f, names))
        print(my_list)


if __name__ == '__main__':
    starter = Starter()
    starter.start()
 

Здесь я объявляю list локальную ProcessingPool операцию. Впоследствии я могу присвоить результат переменной уровня класса, если захочу.

Комментарии:

1. Привет @Paul: последний код передает объект с гораздо более простой цепочкой зависимостей и, следовательно, имеет больше шансов на успех. Если вы хотите увидеть, что вы передаете сериализатору, вы можете включить трассировку pickle с помощью: import dill; dill.detect.trace(True) . Он будет печатать цепочку зависимостей по мере сериализации объекта.

2. Спасибо @MikeMcKerns. Кстати, отличная работа dill . Это открывает так много дверей, которые были закрыты из pickle -за констант.