обновление значений словаря в mpi4py

#python #performance #parallel-processing #mpi #mpi4py

Вопрос:

Как мы можем обновить один глобальный словарь в MPI (в частности, mpi4py) на разных процессорах. Проблема, с которой я сталкиваюсь сейчас после трансляции, заключается в том, что разные процессоры не могут видеть изменения (обновление) в словаре, внесенные другими процессорами.

например, входные данные выглядят следующим образом:

    col1  col2
   -----------
    a      1
    a      1
    b      2
    c      3
    c      1
 

выходной словарь должен быть следующим:

   {'a': 2, 'b': 2, 'c': 4}
 

это означает, что col2 во входных данных суммируются и создают значение для ключей (col1). Словарь изначально пуст и обновляется во время параллельной обработки всеми процессорами (по крайней мере, это то, что мы пытаемся сделать).

Ответ №1:

Как мы можем обновить один глобальный словарь в MPI (в частности, mpi4py) на разных процессорах. Проблема, с которой я сталкиваюсь сейчас после трансляции, заключается в том, что разные процессоры не могут видеть изменения (обновление) в словаре, внесенные другими процессорами.

Во-первых, вам нужно понять, что в MPI каждый процесс MPI запускает полную копию программы. Следовательно, все данные, выделенные в этой программе, являются частными для каждого процесса.

Давайте рассмотрим следующий пример:

 from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    dictionary = {'a': 1, 'c': 3}
    for i in range(1, size, 1):
        data = comm.recv(source=i, tag=11)
        for key in data:
            if key in dictionary:
               dictionary[key] = dictionary[key]   data[key]
            else:
               dictionary[key] = data[key] 
    print(dictionary)
else:
    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)
 

В этом коде процесс с rank=0 выделяет a dictionary , который является частным для этого процесса , таким же образом, что data = {'a': 1, 'b': 2, 'c': 1} является частным для каждого из других процессов. Если (например) процесс изменяет переменную size , это изменение не будет видно другим процессам.

В этом коде все процессы отправляют свою копию словаря:

     data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)
 

к процессу 0, который вызывает comm.recv каждый из других процессов:

 for i in range(1, size, 1):
    data = comm.recv(source=i, tag=11)
 

и объединяет полученные данные (от других процессов) в свой собственный словарь:

     for key in data:
        if key in dictionary:
           dictionary[key] = dictionary[key]   data[key]
        else:
           dictionary[key] = data[key] 
 

в конце концов, только процесс 0 имеет полное завершение dictionary . То же самое произошло и с вами, когда вы вели трансляцию. Тем не менее, в MPI есть процедуры (т. Е. comm.Allgather ), которые позволят вам использовать все dictionary во всех процессах.

Пример такого кода (вам просто нужно адаптироваться к словарю):

 from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)

print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$ 
 

Словарь изначально пуст и обновляется во время
параллельной обработки всеми процессорами (по крайней мере, это то, что мы
пытаемся сделать).

С помощью вышеупомянутой модели (т. Е. парадигмы распределенной памяти) вам потребуется явно взаимодействовать со всеми процессами каждый раз, когда один из них будет изменять словарь. Это означает, что вам нужно будет заранее знать пункты кода, в которых вы должны осуществлять эти сообщения.

Однако, основываясь на вашем тексте, вам, похоже, нужен подход с общей памятью, при котором процесс обновлял бы словарь, например, следующим образом :

     if key in dictionary:
       dictionary[key] = dictionary[key]   data[key]
    else:
       dictionary[key] = data[key] 
 

и эти изменения будут немедленно видны всем процессам. Точно так же, как происходит в многопоточном коде.

MPI 3.0 вводит концепцию общей памяти, где этого действительно можно достичь.

Вот пример использования массивов:

 from mpi4py import MPI 
import numpy as np 

comm = MPI.COMM_WORLD 

size = 1000 
itemsize = MPI.DOUBLE.Get_size() 
if comm.Get_rank() == 0: 
   nbytes = size * itemsize 
else: 
   nbytes = 0 

win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm) 

buf, itemsize = win.Shared_query(0) 
assert itemsize == MPI.DOUBLE.Get_size() 
buf = np.array(buf, dtype='B', copy=False) 
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,)) 

if comm.rank == 1: 
  ary[:5] = np.arange(5) 
 
comm.Barrier() 
if comm.rank == 0: 
  print(ary[:10])
 

Код не мой, он исходит отсюда.