#python #queue #multiprocessing
#python #очередь #многопроцессорная обработка
Вопрос:
Представьте перевернутое двоичное дерево с узлами A, B, C, D, E, F на уровне 0. узлы G, H, I на уровне 1, узел J на уровне 2 и узел K на уровне 3.
Уровень 1: G = функция (A,B), H = функция (C,D), I = функция (E,F)
Уровень 2: J = функция(G,H)
Уровень 3: K = функция (J,I).
Каждая пара узлов на уровне 0 должна обрабатываться по порядку, каждая пара узлов на уровне 1 может обрабатываться в любом порядке, но результат на следующем уровне должен обрабатываться, как показано, и так далее, пока мы не получим конечный результат, K.
Актуальной проблемой является задача вычислительной геометрии, в которой последовательность твердых тел соединяется вместе. A примыкает к B, который примыкает к C, и так далее. Результирующий предохранитель A и B (G) примыкает к предохранителю C и D (H). Результирующее соединение J и I (K) является конечным результатом. Таким образом, вы не можете объединить G и I, поскольку они не являются смежными. Если количество узлов на уровне не равно степени 2, в итоге получается зависший объект, который необходимо обработать на один уровень дальше.
Поскольку процесс fuse является дорогостоящим с точки зрения вычислений и требует много памяти, но очень параллельным, я хотел бы использовать пакет многопроцессорной обработки Python и некоторую форму очереди. После вычисления G = func(A,B) я хотел бы поместить результат G в очередь для последующего вычисления J = func(G, H). Когда очередь пуста, последний результат является окончательным результатом. Имейте в виду, что mp.queue не обязательно выдаст результаты FIFO, поскольку I = func(E,F) может завершиться раньше, чем H = func(C,D)
Я придумал несколько (плохих) решений, но я уверен, что есть элегантное решение, недоступное моему пониманию. Предложения?
Комментарии:
1. Почему level=0 должен обрабатываться по порядку, но level = 1 может обрабатываться в любом порядке? Разве недостаточно выбрать два известных листа и объединить их в один узел?
2. Я ошибаюсь, говоря, что узлы должны обрабатываться по порядку. Они должны обрабатываться с точки зрения смежности. A примыкает к B, примыкает к C и так далее для уровня 0. Вы можете выполнять функции func(A,B) или func(B, C), но не func(A,C). Аналогично, на уровне 1 G примыкает к H, примыкает к I. Вы можете выполнять функции func(G,H) или func(H, I), но не func(G, I).
Ответ №1:
Я не смог придумать разумный дизайн для очереди, но вы можете легко заменить очередь еще одним процессом, который в моем примере я вызвал WorkerManager
. Этот процесс собирает результаты от всех Worker
процессов и запускает новых рабочих только в том случае, если есть два смежных пакета данных, ожидающих обработки. Таким образом, вы никогда не будете пытаться объединить несмежные результаты, поэтому вы можете игнорировать «уровни» и запустить вычисление следующей пары, как только она будет готова.
from multiprocessing import Process, Queue
class Result(object):
'''Result from start to end.'''
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
class Worker(Process):
'''Joins two results into one result.'''
def __init__(self, result_queue, pair):
self.result_queue = result_queue
self.pair = pair
super(Worker, self).__init__()
def run(self):
left, right = self.pair
result = Result(left.start, right.end,
'(%s, %s)' % (left.data, right.data))
self.result_queue.put(result)
class WorkerManager(Process):
'''
Takes results from result_queue, pairs them
and assigns workers to process them.
Returns final result into final_queue.
'''
def __init__(self, result_queue, final_queue, start, end):
self._result_queue = result_queue
self._final_queue = final_queue
self._start = start
self._end = end
self._results = []
super(WorkerManager, self).__init__()
def run(self):
while True:
result = self._result_queue.get()
self._add_result(result)
if self._has_final_result():
self._final_queue.put(self._get_final_result())
return
pair = self._find_adjacent_pair()
if pair:
self._start_worker(pair)
def _add_result(self, result):
self._results.append(result)
self._results.sort(key=lambda result: result.start)
def _has_final_result(self):
return (len(self._results) == 1
and self._results[0].start == self._start
and self._results[0].end == self._end)
def _get_final_result(self):
return self._results[0]
def _find_adjacent_pair(self):
for i in xrange(len(self._results) - 1):
left, right = self._results[i], self._results[i 1]
if left.end == right.start:
self._results = self._results[:i] self._results[i 2:]
return left, right
def _start_worker(self, pair):
worker = Worker(self._result_queue, pair)
worker.start()
if __name__ == '__main__':
DATA = [Result(i, i 1, str(i)) for i in xrange(6)]
result_queue = Queue()
final_queue = Queue()
start = 0
end = len(DATA)
man = WorkerManager(result_queue, final_queue, start, end)
man.start()
for res in DATA:
result_queue.put(res)
final = final_queue.get()
print final.start
# 0
print final.end
# 6
print final.data
# For example:
# (((0, 1), (2, 3)), (4, 5))
Для моего примера я использовал простой Worker
, который возвращает заданные данные в круглых скобках, разделенных запятой, но вы могли бы поместить туда любые вычисления. В моем случае конечным результатом было (((0, 1), (2, 3)), (4, 5))
, что означает, что алгоритм вычислил (0, 1)
и (2, 3)
перед вычислением ((0, 1), (2, 3))
, а затем объединил результат с (4, 5)
. Я надеюсь, это то, что вы искали.
Комментарии:
1. Я придумал решение, которое выглядит следующим образом: def fuser(shapes): shape1_id, shape1 = shapes[0] shape2_id, shape2 = shapes[1] fused = OCC. BRepAlgoAPI. BRepAlgoAPI_Fuse(shape1, shape2). Shape() возвращает ((shape1_id, shape2_id), объединенный) результаты = [(i,a) для i, a в перечислении (фрагменты)] пока len(результаты) > 1: P = обработка. Пул(7 результатов = P.map(fuser, [(a,b) для a,b в zip(результаты[::2],результаты[1::2])]) results.sort(ключ= результат лямбда: результат[0])