несинхронизированные значения при использовании многопроцессорной обработки.Процесс() в Python

#python #multiprocessing #python-multiprocessing #dijkstra

Вопрос:

Я пытаюсь распараллелить алгоритм Дейкстры. Идея в том, чтобы:

  • Разделите все узлы на наборы (количество наборов = количество процессоров: vertexSets[i] )
  • Найдите минимальную вершину (с минимальным расстоянием) в каждом наборе с minDistance()
  • Отправьте эту минимальную вершину всем процессам (добавьте эту вершину в набор каждого процесса)

Я использую multiprocessing.Value переменную with minVertex для сохранения новой минимальной вершины каждый раз, когда процесс узнает об этом, а затем обновляю эту минимальную вершину для всех процессов. 3 процесса, которые я использую, запускают одну и ту же функцию, чтобы нажать и получить значение minVertex . Я новичок multiprocessing в Python, так что, возможно, то, что я использую здесь, не является правильным или лучшим.

Мин вершин перечень должен включать: [0,1,2] 1 от процесса и [7,6] от процесса 2. Эти значения должны быть добавлены в vertexSet[] очередь процессов 1, 2, 3. Однако, после того, как minVertex .стоимость для обновления до всех процессов, иногда minVertex изменяется на новое значение, прежде чем он будет добавлен в vertexSet некоторых процессов. Начальная наборы: [0,1,2] для процесса 1; [3,4,5] для процесса 2; [6,7,8] для процесса 3. Часть результата, который я получил ниже

 def parallelDijkstra(self, src, vertexSet, minVertex, lock):
    # print("vertexSet size", vertexSet.size)
    dist = [sys.maxsize] * self.V # assign all nodes of the whole set = infinity
    dist[src] = 0
    sptSet = [False] * self.V # all nodes are False: no node has been visited yet
    before = [-1] * self.V
    u = 0
    for i in range(vertexSet.size):
        u = self.minDistance(dist, sptSet, vertexSet) # extract the min vertext
        sptSet[u] = True

        lock.acquire()
        minVertex.value = u

        print("process", multiprocessing.current_process().name)
        print("vertexSet before", vertexSet)
        print("*minVertex", minVertex.value)
        vertexSet = np.append(minVertex.value, vertexSet)
        print("vertexSet after", vertexSet)
        print(" ")
        lock.release()
 
 def separateDataProcessors(self, src, numProcessors):
    vertices = []
    for i in range(self.V):
        vertices.append(i)
    vertexSets = np.array_split(vertices, numProcessors)
    # [array([0, 1, 2]), array([3, 4, 5]), array([6, 7, 8])]

    # the min-distance storing vertex to update to all processes
    minVertex = multiprocessing.Value("i")

    lock = multiprocessing.Lock()


    # creating processes
    # for i in range(numProcessors):
    p1 = multiprocessing.Process(target=self.parallelDijkstra, name="p1", args=(src, vertexSets[0], minVertex, lock))
    p2 = multiprocessing.Process(target=self.parallelDijkstra, name="p2", args=(src, vertexSets[1], minVertex, lock))
    p3 = multiprocessing.Process(target=self.parallelDijkstra, name="p3", args=(src, vertexSets[2], minVertex, lock))

    # starting processes
    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()
 
 process p1
process p2
vertexSet before [0 3 4 5]
vertexSet before [0 1 2]
vertexSet after [0 6 7 8]
*minVertex 0

vertexSet after [0 1 2]
*minVertex 0
process p3

vertexSet before [0 6 7 8]
vertexSet after [7 0 3 4 5]
*minVertex 7
vertexSet after [0 6 7 8]

process p3
vertexSet before [0 6 7 8]

*minVertex 6
process p2
process p1
vertexSet before [0 1 2]
*minVertex 2
vertexSet after [0 6 7 8]
vertexSet after [0 1 2]

vertexSet before [7 0 3 4 5]

*minVertex 2
vertexSet after [2 7 0 3 4 5]
 

The graph:

 g.graph = [[0, 4, 0, 0, 0, 0, 0, 8, 0],
           [4, 0, 8, 0, 0, 0, 0, 11, 0],
           [0, 8, 0, 7, 0, 4, 0, 0, 2],
           [0, 0, 7, 0, 9, 14, 0, 0, 0],
           [0, 0, 0, 9, 0, 10, 0, 0, 0],
           [0, 0, 4, 14, 10, 0, 2, 0, 0],
           [0, 0, 0, 0, 0, 2, 0, 1, 6],
           [8, 11, 0, 0, 0, 0, 1, 0, 7],
           [0, 0, 2, 0, 0, 0, 6, 7, 0]]