#python #arrays #numpy #parallel-processing #dask
#python #массивы #numpy #параллельная обработка #dask
Вопрос:
У меня есть массив numpy координат размером n_slice x 2048 x 3, где n_slice исчисляется десятками тысяч. Я хочу применить следующую операцию к каждому фрагменту размером 2048 x 3 отдельно
import numpy as np
from scipy.spatial.distance import pdist
# load coor from a binary xyz file, dcd format
n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])
# this loop is what I want to parallelize, each slice is completely independent
for i in xrange(n_slice):
dist[i, r[:, None] < r] = pdist(coor[i])
Я попытался использовать Dask, создав coor
dask.array
,
import dask.array as da
dcoor = da.from_array(coor, chunks=(1, 2048, 3))
но простая замена coor
на dcoor
не выявит параллелизм. Я мог бы увидеть настройку параллельных потоков для запуска для каждого фрагмента, но как мне использовать Dask для обработки параллелизма?
Вот параллельная реализация с использованием concurrent.futures
import concurrent.futures
import multiprocessing
n_cpu = multiprocessing.cpu_count()
def get_dist(coor, dist, r):
dist[r[:, None] < r] = pdist(coor)
# load coor from a binary xyz file, dcd format
n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])
with concurrent.futures.ThreadPoolExecutor(max_workers=n_cpu) as executor:
for i in xrange(n_slice):
executor.submit(get_dist, cool[i], dist[i], r)
Возможно, эта проблема не очень подходит для Dask, поскольку нет вычислений между блоками.
Комментарии:
Ответ №1:
map_blocks
Метод map_blocks может быть полезен:
dcoor.map_blocks(pdist)
Неравномерные массивы
Похоже, вы делаете немного причудливую нарезку, чтобы вставить определенные значения в определенные местоположения выходного массива. Вероятно, это будет неудобно делать с dask.arrays. Вместо этого я рекомендую создать функцию, которая создает массив numpy
def myfunc(chunk):
values = pdist(chunk[0, :, :])
output = np.zeroes((2048, 2048))
r = np.arange(2048)
output[r[:, None] < r] = values
return output
dcoor.map_blocks(myfunc)
delayed
В худшем случае вы всегда можете использовать dask.delayed
from dask import delayed, compute
coor2 = delayed(coor)
slices = [coor2[i] for i in range(coor.shape[0])]
slices2 = [delayed(pdist)(slice) for slice in slices]
results = compute(*slices2)
Комментарии:
1. Выходные
pdist
данные представляют собой массив расстояний между каждой парой координат в срезе. В моем примере результат каждого среза имеет размеры(2048 * 2047 / 2,)
. Этих массивов на самом деле достаточно. Я думаю, что этот ответ обеспечивает хорошее начало, хотя я думаюmap_blocks
, что он пытается выполнятьсяpdist
так, как если бы я передал весь 3D-массив целиком. Я хочу, чтобы он вычислялся только на 2D-фрагментах, которые я разделил на куски; не должно быть никаких перекрестных вычислений.2. Он вызывает pdist для каждого
(1, 2048, 3)
размера фрагмента. Я бы обернул вашу функцию pdist во что-то, что извлекает первое измерение, а затем возвращает полную квадратную матрицу. Отредактировал мой ответ выше.