многопроцессорная обработка python JoinableQueue PicklingError

#python #queue #multiprocessing

#python #очередь #многопроцессорная обработка

Вопрос:

Sorry…it кажется, я задал популярный вопрос, но я не могу найти ничего полезного для моего случая из stackflow: P

итак, мой код выполняет следующие действия:

шаг 1. родительский процесс записывает объект задачи в многопроцессорную обработку.JoinableQueue

шаг 2. дочерний процесс (более 1) считывает (получает) объект задачи из JoinableQueue и выполняет задачу

моя структура модуля:

A.py

  • Задача класса (объект)

  • Класс WorkerPool(объект)

  • Рабочий класс (многопроцессорная обработка.Процесс)

    • def run() # здесь выполняется шаг 2
  • Класс TestGroup()

    • def LoadTest() # здесь выполняется шаг 1 выше, т.Е. Добавляется объект задачи

Я понимаю, что когда mp.Используется JoinableQueue, добавляемые объекты должны быть выбираемыми, я понял значение «выбираемого» из https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled

мои вопросы: 1. можно ли выбрать объект задачи в моем случае?

  1. Я получил ошибку ниже, когда код добавляет объекты задачи в JoinableQueue:

    Файл «/usr/lib/python2.6/multiprocessing/queues.py «, строка 242, в _feed

2014-06-23 03:18:43 ИНФОРМАЦИОННАЯ тестовая группа: G1 Конечный нагрузочный тест: object1

2014-06-23 03:18:43 ИНФОРМАЦИОННАЯ тестовая группа: G1 Конечный нагрузочный тест: object2

2014-06-23 03:18:43 ИНФОРМАЦИОННАЯ тестовая группа: G1 Конечный нагрузочный тест: отправка object3(obj)

Ошибка PicklingError: не удается выполнить поиск: поиск атрибутов pysphere.resources.VimService_services_types.Сбой DynamicData_Holder

  1. Каково общее использование mp.JoinableQueue? в моем случае мне нужно использовать join() и task_done()

  2. Когда я решаю использовать Queue.Очередь вместо mp.JoinableQueue, ошибка выбора просто исчезла, однако, проверяя журнал, я обнаружил, что все дочерние процессы продолжают работать с первым объектом очереди, в чем возможная причина этой ситуации?

Ответ №1:

multiprocessing Модуль в Python запускает несколько процессов для выполнения ваших задач. Поскольку процессы не совместно используют память, они должны иметь возможность обмениваться данными с использованием сериализованных данных. многопроцессорная обработка использует модуль pickle для выполнения сериализации, поэтому требование, чтобы объекты, которые вы передаете задачам, были доступны для выбора.

1) Кажется, что ваш объект task содержит экземпляр из py sphere.resource.VimService_services_types . Вероятно, это ссылка на системный ресурс, такой как открытый файл. Это не может быть сериализовано или передано от одного процесса к другому, и поэтому это вызывает ошибку травления.

Что вы можете сделать с mp.JoinableQueue передает необходимые аргументы задаче и запускает службу в самой задаче, чтобы она была локальной для этого процесса.

Например:

 queue = mp.JoinableQueue()
# not queue.put(task), since the new process will create the task
queue.put(task_args)

def f(task_args):
    task = Task(task_args)
    ...
    # you can't return the task, unless you've closed all non-serializable parts
    return task.result

process = Process(target=f, args=(queue,))
... 
  

2) Очередь.Очередь предназначена для потоковой передачи. Он использует общую память и механизмы синхронизации для обеспечения атомарных операций. Однако, когда вы запускаете новый процесс с многопроцессорной обработкой, он копирует начальный процесс, и поэтому каждый дочерний процесс будет работать с одними и теми же объектами очереди, поскольку очередь в памяти была скопирована для каждого процесса.

Комментарии:

1. Спасибо, Джонатан, я поместил имя обращения (строковый тип) в очередь вместо самого объекта обращения, больше не сообщается о picklingerror…