#python #performance #parallel-processing #mpi #mpi4py
Вопрос:
Как мы можем обновить один глобальный словарь в MPI (в частности, mpi4py) на разных процессорах. Проблема, с которой я сталкиваюсь сейчас после трансляции, заключается в том, что разные процессоры не могут видеть изменения (обновление) в словаре, внесенные другими процессорами.
например, входные данные выглядят следующим образом:
col1 col2
-----------
a 1
a 1
b 2
c 3
c 1
выходной словарь должен быть следующим:
{'a': 2, 'b': 2, 'c': 4}
это означает, что col2 во входных данных суммируются и создают значение для ключей (col1). Словарь изначально пуст и обновляется во время параллельной обработки всеми процессорами (по крайней мере, это то, что мы пытаемся сделать).
Ответ №1:
Как мы можем обновить один глобальный словарь в MPI (в частности, mpi4py) на разных процессорах. Проблема, с которой я сталкиваюсь сейчас после трансляции, заключается в том, что разные процессоры не могут видеть изменения (обновление) в словаре, внесенные другими процессорами.
Во-первых, вам нужно понять, что в MPI каждый процесс MPI запускает полную копию программы. Следовательно, все данные, выделенные в этой программе, являются частными для каждого процесса.
Давайте рассмотрим следующий пример:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
dictionary = {'a': 1, 'c': 3}
for i in range(1, size, 1):
data = comm.recv(source=i, tag=11)
for key in data:
if key in dictionary:
dictionary[key] = dictionary[key] data[key]
else:
dictionary[key] = data[key]
print(dictionary)
else:
data = {'a': 1, 'b': 2, 'c': 1}
comm.send(data, dest=0, tag=11)
В этом коде процесс с rank=0
выделяет a dictionary
, который является частным для этого процесса , таким же образом, что data = {'a': 1, 'b': 2, 'c': 1}
является частным для каждого из других процессов. Если (например) процесс изменяет переменную size
, это изменение не будет видно другим процессам.
В этом коде все процессы отправляют свою копию словаря:
data = {'a': 1, 'b': 2, 'c': 1}
comm.send(data, dest=0, tag=11)
к процессу 0, который вызывает comm.recv
каждый из других процессов:
for i in range(1, size, 1):
data = comm.recv(source=i, tag=11)
и объединяет полученные данные (от других процессов) в свой собственный словарь:
for key in data:
if key in dictionary:
dictionary[key] = dictionary[key] data[key]
else:
dictionary[key] = data[key]
в конце концов, только процесс 0 имеет полное завершение dictionary
. То же самое произошло и с вами, когда вы вели трансляцию. Тем не менее, в MPI есть процедуры (т. Е. comm.Allgather
), которые позволят вам использовать все dictionary
во всех процессах.
Пример такого кода (вам просто нужно адаптироваться к словарю):
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)
print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer, MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$
Словарь изначально пуст и обновляется во время
параллельной обработки всеми процессорами (по крайней мере, это то, что мы
пытаемся сделать).
С помощью вышеупомянутой модели (т. Е. парадигмы распределенной памяти) вам потребуется явно взаимодействовать со всеми процессами каждый раз, когда один из них будет изменять словарь. Это означает, что вам нужно будет заранее знать пункты кода, в которых вы должны осуществлять эти сообщения.
Однако, основываясь на вашем тексте, вам, похоже, нужен подход с общей памятью, при котором процесс обновлял бы словарь, например, следующим образом :
if key in dictionary:
dictionary[key] = dictionary[key] data[key]
else:
dictionary[key] = data[key]
и эти изменения будут немедленно видны всем процессам. Точно так же, как происходит в многопоточном коде.
MPI 3.0 вводит концепцию общей памяти, где этого действительно можно достичь.
Вот пример использования массивов:
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = 1000
itemsize = MPI.DOUBLE.Get_size()
if comm.Get_rank() == 0:
nbytes = size * itemsize
else:
nbytes = 0
win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm)
buf, itemsize = win.Shared_query(0)
assert itemsize == MPI.DOUBLE.Get_size()
buf = np.array(buf, dtype='B', copy=False)
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,))
if comm.rank == 1:
ary[:5] = np.arange(5)
comm.Barrier()
if comm.rank == 0:
print(ary[:10])
Код не мой, он исходит отсюда.