Как я могу использовать Dask для выполнения параллельных операций над фрагментами массивов NumPy?

#python #arrays #numpy #parallel-processing #dask

#python #массивы #numpy #параллельная обработка #dask

Вопрос:

У меня есть массив numpy координат размером n_slice x 2048 x 3, где n_slice исчисляется десятками тысяч. Я хочу применить следующую операцию к каждому фрагменту размером 2048 x 3 отдельно

 import numpy as np
from scipy.spatial.distance import pdist

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

# this loop is what I want to parallelize, each slice is completely independent
for i in xrange(n_slice): 
    dist[i, r[:, None] < r] = pdist(coor[i])
 

Я попытался использовать Dask, создав coor dask.array ,

 import dask.array as da
dcoor = da.from_array(coor, chunks=(1, 2048, 3))
 

но простая замена coor на dcoor не выявит параллелизм. Я мог бы увидеть настройку параллельных потоков для запуска для каждого фрагмента, но как мне использовать Dask для обработки параллелизма?

Вот параллельная реализация с использованием concurrent.futures

 import concurrent.futures
import multiprocessing

n_cpu = multiprocessing.cpu_count()

def get_dist(coor, dist, r):
    dist[r[:, None] < r] = pdist(coor)

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

with concurrent.futures.ThreadPoolExecutor(max_workers=n_cpu) as executor:
    for i in xrange(n_slice):
        executor.submit(get_dist, cool[i], dist[i], r)
 

Возможно, эта проблема не очень подходит для Dask, поскольку нет вычислений между блоками.

Комментарии:

1. dask.pydata.org/en/latest/…

Ответ №1:

map_blocks

Метод map_blocks может быть полезен:

 dcoor.map_blocks(pdist)
 

Неравномерные массивы

Похоже, вы делаете немного причудливую нарезку, чтобы вставить определенные значения в определенные местоположения выходного массива. Вероятно, это будет неудобно делать с dask.arrays. Вместо этого я рекомендую создать функцию, которая создает массив numpy

 def myfunc(chunk):
    values = pdist(chunk[0, :, :])
    output = np.zeroes((2048, 2048))
    r = np.arange(2048)
    output[r[:, None] < r] = values
    return output

dcoor.map_blocks(myfunc)
 

delayed

В худшем случае вы всегда можете использовать dask.delayed

 from dask import delayed, compute
coor2 = delayed(coor)
slices = [coor2[i] for i in range(coor.shape[0])]
slices2 = [delayed(pdist)(slice) for slice in slices]
results = compute(*slices2)
 

Комментарии:

1. Выходные pdist данные представляют собой массив расстояний между каждой парой координат в срезе. В моем примере результат каждого среза имеет размеры (2048 * 2047 / 2,) . Этих массивов на самом деле достаточно. Я думаю, что этот ответ обеспечивает хорошее начало, хотя я думаю map_blocks , что он пытается выполняться pdist так, как если бы я передал весь 3D-массив целиком. Я хочу, чтобы он вычислялся только на 2D-фрагментах, которые я разделил на куски; не должно быть никаких перекрестных вычислений.

2. Он вызывает pdist для каждого (1, 2048, 3) размера фрагмента. Я бы обернул вашу функцию pdist во что-то, что извлекает первое измерение, а затем возвращает полную квадратную матрицу. Отредактировал мой ответ выше.