Неверный вывод данных многопроцессорной обработки Python

#python #numpy #multiprocessing

#python #numpy #многопроцессорная обработка

Вопрос:

Я пытаюсь выполнить многопроцессорную обработку на Python. Я написал некоторый код, который добавляет вектор, но не смог получить вывод из функции. Это означает, что на выходе Z выводится 0, а не 2.

 from multiprocessing import Process
import numpy as np

numThreads = 16
num = 16

numIter = num/numThreads

X = np.ones((num, 1))
Y = np.ones((num, 1))
Z = np.zeros((num, 1))

def add(X,Y,Z,j):
    Z[j] = X[j]   Y[j]

if __name__ == '__main__':
  jobs = []
  for i in range(numThreads):
    p = Process(target=add, args=(X, Y, Z, i,))
    jobs.append(p)

  for i in range(numThreads):
    jobs[i].start()

  for i in range(numThreads):
    jobs[i].join()

  print Z[0]
  

Редактировать: воспользовался советом clocker и изменил мой код на этот:

 import multiprocessing
import numpy as np

numThreads = 16
numRows = 32000
numCols = 2
numOut = 3

stride = numRows / numThreads

X = np.ones((numRows, numCols))
W = np.ones((numCols, numOut))
B = np.ones((numRows, numOut))
Y = np.ones((numRows, numOut))

def conv(idx):
  Y[idx*stride:idx*stride stride] = X[idx*stride:idx*stride stride].dot(W)   B[idx*stride:idx*stride stride]

if __name__=='__main__':
  pool = multiprocessing.Pool(numThreads)
  pool.map(conv, range(numThreads))
  print Y
  

И на выходе получается Y вместо Saxp.

Комментарии:

1. Что вы подразумеваете под «не удалось получить результат»?

Ответ №1:

Причина, по которой ваша последняя строка print Z[0] возвращает [0] вместо [2], заключается в том, что каждый из процессов создает независимую копию Z (или может быть Z[j] — не совсем уверен в этом) перед ее изменением. В любом случае, отдельный запуск процесса гарантирует, что ваша исходная версия останется неизменной.

Если бы вы использовали threading module вместо этого, последняя строка действительно вернула бы [2], как и ожидалось, но это не многопроцессорная обработка.

Итак, вы, вероятно, хотите использовать multiprocessing.Pool вместо этого. Продолжая ваш эксперимент исключительно для иллюстрации, можно было бы сделать следующее:

 In [40]: pool = multiprocessing.Pool()
In [41]: def add_func(j):
   ....:     return X[j]   Y[j]
In [42]: pool = multiprocessing.Pool(numThreads)
In [43]: pool.map(add_func, range(numThreads))
Out[43]: 
[array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.]),
 array([ 2.])]
  

Получайте удовольствие!

Что касается вашей второй части вашего вопроса, проблема в том, что функция conv() не возвращает никакого значения. В то время как пул процессов получает копию X, B и W для извлечения значений, Y внутри conv() является локальным для каждого запущенного процесса. Чтобы получить новое вычисленное значение Y, вы должны использовать что-то вроде этого:

 def conv(idx):
    Ylocal_section = X[idx*stride:idx*stride stride].dot(W)    B[idx*stride:idx*stride stride]
    return Ylocal_section 

results = pool.map(conv, range(numThreads)) # then apply each result to Y
for idx in range(numThreads):
    Y[idx*stride:idx*stride stride] = results[idx] 
  

Параллелизм может очень быстро усложниться, и на данный момент я бы оценил существующие библиотеки, которые могут выполнять быструю свертку 2D. библиотеки numpy и scipy могут быть очень эффективными и лучше удовлетворять вашим потребностям.

Комментарии:

1. Привет, если это не слишком сложно, не могли бы вы объяснить, как работает многопроцессорная обработка в Python. Например, как использовать range(numThreads) масштабирование для нескольких наборов данных? Я больше люблю потоки C 11. Синтаксис Python немного странный из-за этого.

2. диапазон (numThreads) просто преобразуется в [0, 1, 2, … numThreads-1] . Ввод строки In [43] эквивалентен: pool.map(add_func, [0,1,2,3, ..., 15]) . Функция map задокументирована в стандартной библиотеке python; она преобразуется в: for i in range(numThreads): add_func(i)

3. Привет, я изменил данные, чтобы использовать свертку. Я вижу ту же ошибку. Код, размещенный в вопросе.