#python #multiprocessing #python-multiprocessing #dijkstra
Вопрос:
Я пытаюсь распараллелить алгоритм Дейкстры. Идея в том, чтобы:
- Разделите все узлы на наборы (количество наборов = количество процессоров:
vertexSets[i]
) - Найдите минимальную вершину (с минимальным расстоянием) в каждом наборе с
minDistance()
- Отправьте эту минимальную вершину всем процессам (добавьте эту вершину в набор каждого процесса)
Я использую multiprocessing.Value
переменную with minVertex
для сохранения новой минимальной вершины каждый раз, когда процесс узнает об этом, а затем обновляю эту минимальную вершину для всех процессов. 3 процесса, которые я использую, запускают одну и ту же функцию, чтобы нажать и получить значение minVertex
. Я новичок multiprocessing
в Python, так что, возможно, то, что я использую здесь, не является правильным или лучшим.
Мин вершин перечень должен включать: [0,1,2]
1 от процесса и [7,6]
от процесса 2. Эти значения должны быть добавлены в vertexSet[]
очередь процессов 1, 2, 3. Однако, после того, как minVertex
.стоимость для обновления до всех процессов, иногда minVertex
изменяется на новое значение, прежде чем он будет добавлен в vertexSet
некоторых процессов. Начальная наборы: [0,1,2]
для процесса 1; [3,4,5]
для процесса 2; [6,7,8]
для процесса 3. Часть результата, который я получил ниже
def parallelDijkstra(self, src, vertexSet, minVertex, lock):
# print("vertexSet size", vertexSet.size)
dist = [sys.maxsize] * self.V # assign all nodes of the whole set = infinity
dist[src] = 0
sptSet = [False] * self.V # all nodes are False: no node has been visited yet
before = [-1] * self.V
u = 0
for i in range(vertexSet.size):
u = self.minDistance(dist, sptSet, vertexSet) # extract the min vertext
sptSet[u] = True
lock.acquire()
minVertex.value = u
print("process", multiprocessing.current_process().name)
print("vertexSet before", vertexSet)
print("*minVertex", minVertex.value)
vertexSet = np.append(minVertex.value, vertexSet)
print("vertexSet after", vertexSet)
print(" ")
lock.release()
def separateDataProcessors(self, src, numProcessors):
vertices = []
for i in range(self.V):
vertices.append(i)
vertexSets = np.array_split(vertices, numProcessors)
# [array([0, 1, 2]), array([3, 4, 5]), array([6, 7, 8])]
# the min-distance storing vertex to update to all processes
minVertex = multiprocessing.Value("i")
lock = multiprocessing.Lock()
# creating processes
# for i in range(numProcessors):
p1 = multiprocessing.Process(target=self.parallelDijkstra, name="p1", args=(src, vertexSets[0], minVertex, lock))
p2 = multiprocessing.Process(target=self.parallelDijkstra, name="p2", args=(src, vertexSets[1], minVertex, lock))
p3 = multiprocessing.Process(target=self.parallelDijkstra, name="p3", args=(src, vertexSets[2], minVertex, lock))
# starting processes
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
process p1
process p2
vertexSet before [0 3 4 5]
vertexSet before [0 1 2]
vertexSet after [0 6 7 8]
*minVertex 0
vertexSet after [0 1 2]
*minVertex 0
process p3
vertexSet before [0 6 7 8]
vertexSet after [7 0 3 4 5]
*minVertex 7
vertexSet after [0 6 7 8]
process p3
vertexSet before [0 6 7 8]
*minVertex 6
process p2
process p1
vertexSet before [0 1 2]
*minVertex 2
vertexSet after [0 6 7 8]
vertexSet after [0 1 2]
vertexSet before [7 0 3 4 5]
*minVertex 2
vertexSet after [2 7 0 3 4 5]
The graph:
g.graph = [[0, 4, 0, 0, 0, 0, 0, 8, 0],
[4, 0, 8, 0, 0, 0, 0, 11, 0],
[0, 8, 0, 7, 0, 4, 0, 0, 2],
[0, 0, 7, 0, 9, 14, 0, 0, 0],
[0, 0, 0, 9, 0, 10, 0, 0, 0],
[0, 0, 4, 14, 10, 0, 2, 0, 0],
[0, 0, 0, 0, 0, 2, 0, 1, 6],
[8, 11, 0, 0, 0, 0, 1, 0, 7],
[0, 0, 2, 0, 0, 0, 6, 7, 0]]