#python #for-loop #parallel-processing #multiprocessing #dask
#python #цикл for #параллельная обработка #многопроцессорность #dask
Вопрос:
У меня есть функция var
. Я хочу знать наилучший возможный способ быстрого запуска цикла for (для нескольких координат: xs и ys) в рамках этой функции путем многопроцессорной / параллельной обработки с использованием всех процессоров, ядер и оперативной памяти, имеющихся в системе.
Возможно ли это с помощью Dask
модуля?
pysheds
документацию можно найти здесь.
import numpy as np
from pysheds.grid import Grid
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
for (x,y) in zip(xs,ys):
grid = Grid.from_raster('E:/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=1500, xytype='label')
....
....
results
Комментарии:
1. Вы можете попробовать
numba
.
Ответ №1:
Я попытался дать воспроизводимый код ниже, используя dask
. Вы можете добавить основную часть обработки pysheds
или любые другие функции в нее для более быстрой параллельной итерации параметров.
Документацию dask
модуля можно найти здесь.
import dask
from dask import delayed, compute
from dask.distributed import Client, progress
from pysheds.grid import Grid
client = Client(threads_per_worker=2, n_workers=2) #Choose the number of workers and threads per worker over here to deploy for your task.
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
#Firstly, a function has to be created, where the iteration of the parameters is involved.
def var(x,y):
grid = Grid.from_raster('data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=1500, xytype='label')
...
...
return (result)
#Now calling the function in a 'dask' way.
lazy_results = []
for (x,y) in zip(xs,ys):
lazy_result = dask.delayed(var)(x,y)
lazy_results.append(lazy_result)
#Final command to execute the function var(x,y) and get the result.
dask.compute(*lazy_results)
Ответ №2:
Вы не разместили ссылку на свой image1.tif
файл, поэтому приведенный ниже пример кода использует pysheds/data/dem.tif
from https://github.com/mdbartos/pysheds Основная идея состоит в том, чтобы разделить входные параметры, xs
и ys
в вашем случае, на подмножества, а затем предоставить каждому процессору другое подмножество для работы.
main()
вычисляет решение дважды, один раз последовательно и один раз параллельно, затем сравнивает решения из каждого. В параллельном решении есть некоторая неэффективность, поскольку файл изображения будет считываться каждым процессором, поэтому есть возможности для улучшения (т. Е. Прочитайте файл изображения за пределами параллельной части, а затем передайте полученный grid
объект каждому экземпляру).
import numpy as np
from pysheds.grid import Grid
from dask.distributed import Client
from dask import delayed, compute
xs = 10, 20, 30, 40, 50, 60, 70, 80, 90, 100
ys = 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125
def var(image_file, x_in, y_in):
grid = Grid.from_raster(image_file, data_name='map')
variable_avg = []
for (x,y) in zip(x_in,y_in):
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable_avg.append( np.array(variable).mean() )
return(variable_avg)
def var_parallel(n_cpu, image_file, x_in, y_in):
tasks = []
for cpu in range(n_cpu):
x_in = xs[cpu::n_cpu] # eg, cpu = 0: x_in = (10, 40, 70, 100)
y_in = ys[cpu::n_cpu] #
tasks.append( delayed(var)(image_file, x_in, y_in) )
ans = compute(tasks)
# reassemble solution in the right order
par_avg = [None]*len(xs)
for cpu in range(n_cpu):
par_avg[cpu::n_cpu] = ans[0][cpu]
print('AVG (parallel) =',par_avg)
return par_avg
def main():
image_file = 'pysheds/data/dem.tif'
# sequential solution:
seq_avg = var(image_file, xs, ys)
print('AVG (sequential)=',seq_avg)
# parallel solution:
n_cpu = 3
dask_client = Client(n_workers=n_cpu)
par_avg = var_parallel(n_cpu, image_file, xs, ys)
dask_client.shutdown()
print('max error=',
max([ abs(seq_avg[i]-par_avg[i]) for i in range(len(seq_avg))]))
if __name__ == '__main__': main()
Комментарии:
1. Я не понимаю, почему вы не используете: multiprocessing. Пул? и установите размер пула на количество процессоров или используйте os.cpu_count() и выполните: для x, y в zip(xs, ys): _async_proc = processes_pool.apply_async(var, (imag_name, x, y))