Реализация особого типа многопроцессорной очереди в Python

#python #queue #multiprocessing

#python #очередь #многопроцессорная обработка

Вопрос:

Представьте перевернутое двоичное дерево с узлами A, B, C, D, E, F на уровне 0. узлы G, H, I на уровне 1, узел J на уровне 2 и узел K на уровне 3.

Уровень 1: G = функция (A,B), H = функция (C,D), I = функция (E,F)

Уровень 2: J = функция(G,H)

Уровень 3: K = функция (J,I).

Каждая пара узлов на уровне 0 должна обрабатываться по порядку, каждая пара узлов на уровне 1 может обрабатываться в любом порядке, но результат на следующем уровне должен обрабатываться, как показано, и так далее, пока мы не получим конечный результат, K.

Актуальной проблемой является задача вычислительной геометрии, в которой последовательность твердых тел соединяется вместе. A примыкает к B, который примыкает к C, и так далее. Результирующий предохранитель A и B (G) примыкает к предохранителю C и D (H). Результирующее соединение J и I (K) является конечным результатом. Таким образом, вы не можете объединить G и I, поскольку они не являются смежными. Если количество узлов на уровне не равно степени 2, в итоге получается зависший объект, который необходимо обработать на один уровень дальше.

Поскольку процесс fuse является дорогостоящим с точки зрения вычислений и требует много памяти, но очень параллельным, я хотел бы использовать пакет многопроцессорной обработки Python и некоторую форму очереди. После вычисления G = func(A,B) я хотел бы поместить результат G в очередь для последующего вычисления J = func(G, H). Когда очередь пуста, последний результат является окончательным результатом. Имейте в виду, что mp.queue не обязательно выдаст результаты FIFO, поскольку I = func(E,F) может завершиться раньше, чем H = func(C,D)

Я придумал несколько (плохих) решений, но я уверен, что есть элегантное решение, недоступное моему пониманию. Предложения?

Комментарии:

1. Почему level=0 должен обрабатываться по порядку, но level = 1 может обрабатываться в любом порядке? Разве недостаточно выбрать два известных листа и объединить их в один узел?

2. Я ошибаюсь, говоря, что узлы должны обрабатываться по порядку. Они должны обрабатываться с точки зрения смежности. A примыкает к B, примыкает к C и так далее для уровня 0. Вы можете выполнять функции func(A,B) или func(B, C), но не func(A,C). Аналогично, на уровне 1 G примыкает к H, примыкает к I. Вы можете выполнять функции func(G,H) или func(H, I), но не func(G, I).

Ответ №1:

Я не смог придумать разумный дизайн для очереди, но вы можете легко заменить очередь еще одним процессом, который в моем примере я вызвал WorkerManager . Этот процесс собирает результаты от всех Worker процессов и запускает новых рабочих только в том случае, если есть два смежных пакета данных, ожидающих обработки. Таким образом, вы никогда не будете пытаться объединить несмежные результаты, поэтому вы можете игнорировать «уровни» и запустить вычисление следующей пары, как только она будет готова.

 from multiprocessing import Process, Queue

class Result(object):
    '''Result from start to end.'''
    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data


class Worker(Process):
    '''Joins two results into one result.'''
    def __init__(self, result_queue, pair):
        self.result_queue = result_queue
        self.pair = pair
        super(Worker, self).__init__()

    def run(self):
        left, right = self.pair
        result = Result(left.start, right.end,
                        '(%s, %s)' % (left.data, right.data))
        self.result_queue.put(result)


class WorkerManager(Process):
    '''
    Takes results from result_queue, pairs them
    and assigns workers to process them.
    Returns final result into final_queue.
    '''
    def __init__(self, result_queue, final_queue, start, end):
        self._result_queue = result_queue
        self._final_queue = final_queue
        self._start = start
        self._end = end
        self._results = []
        super(WorkerManager, self).__init__()

    def run(self):
        while True:
            result = self._result_queue.get()
            self._add_result(result)
            if self._has_final_result():
                self._final_queue.put(self._get_final_result())
                return
            pair = self._find_adjacent_pair()
            if pair:
                self._start_worker(pair)

    def _add_result(self, result):
        self._results.append(result)
        self._results.sort(key=lambda result: result.start)

    def _has_final_result(self):
        return (len(self._results) == 1
                and self._results[0].start == self._start
                and self._results[0].end == self._end)

    def _get_final_result(self):
        return self._results[0]

    def _find_adjacent_pair(self):
        for i in xrange(len(self._results) - 1):
            left, right = self._results[i], self._results[i   1]
            if left.end == right.start:
                self._results = self._results[:i]   self._results[i   2:]
                return left, right

    def _start_worker(self, pair):
        worker = Worker(self._result_queue, pair)
        worker.start()

if __name__ == '__main__':
    DATA = [Result(i, i   1, str(i)) for i in xrange(6)]
    result_queue = Queue()
    final_queue = Queue()
    start = 0
    end = len(DATA)
    man = WorkerManager(result_queue, final_queue, start, end)
    man.start()
    for res in DATA:
        result_queue.put(res)
    final = final_queue.get()
    print final.start
    # 0
    print final.end
    # 6
    print final.data
    # For example:
    # (((0, 1), (2, 3)), (4, 5))
  

Для моего примера я использовал простой Worker , который возвращает заданные данные в круглых скобках, разделенных запятой, но вы могли бы поместить туда любые вычисления. В моем случае конечным результатом было (((0, 1), (2, 3)), (4, 5)) , что означает, что алгоритм вычислил (0, 1) и (2, 3) перед вычислением ((0, 1), (2, 3)) , а затем объединил результат с (4, 5) . Я надеюсь, это то, что вы искали.

Комментарии:

1. Я придумал решение, которое выглядит следующим образом: def fuser(shapes): shape1_id, shape1 = shapes[0] shape2_id, shape2 = shapes[1] fused = OCC. BRepAlgoAPI. BRepAlgoAPI_Fuse(shape1, shape2). Shape() возвращает ((shape1_id, shape2_id), объединенный) результаты = [(i,a) для i, a в перечислении (фрагменты)] пока len(результаты) > 1: P = обработка. Пул(7 результатов = P.map(fuser, [(a,b) для a,b в zip(результаты[::2],результаты[1::2])]) results.sort(ключ= результат лямбда: результат[0])