#python #dask #netcdf #python-xarray
#python #dask #netcdf #python-xarray
Вопрос:
Я пытаюсь вычислить скорость ветра из компонентов u и v для данных за 1 год с часовым шагом и разрешением 0,1 x 0,1 градуса в общей сложности за 40 лет. Отдельные u и v netcdf-файлы за 1 год составляют около 5 ГБ каждый. Я реализовал базовый for
цикл, в котором u и v файлов netcdf для каждого года открываются через Xarray open_dataset
и повторно обрабатываются, чтобы получить их в виде массивов dask, после чего выполняются вычисления и экспортируется результат как новый netcdf. При запуске цикла первая итерация выполняется почти мгновенно, но затем цикл занимает слишком много времени для следующей итерации (почти до такой степени, что кажется, что он остановился). Я не понимаю, какая часть моего кода здесь является узким местом и почему. Любая помощь будет оценена. Кроме того, я правильно реализовал планировщик dask для адаптивного запроса ресурсов. Я прилагаю соответствующий фрагмент кода для справки :
cluster = PBSCluster(cores=1,memory='8GB',queue='standard',project='civil',interface='ib0',walltime='00:20:00')
cluster.adapt(minimum=1, maximum=8)
client = Client(cluster)
for i in range (1979,2019):
u_dir = glob.glob('../u_wind/uwind_hourly_' str(i) '*.nc')
v_dir = glob.glob('../v_wind/vwind_hourly_' str(i) '*.nc')
w_dir = './wind/wind_hourly_' str(i) '-' str(i) '.nc'
u_wind = xr.open_dataset(u_dir[0])
v_wind = xr.open_dataset(v_dir[0])
u_wind_rechunk = u_wind.chunk({'time':720})
v_wind_rechunk = v_wind.chunk({'time':720})
u_var = u_wind_rechunk['UGRD_10m']
v_var = v_wind_rechunk['VGRD_10m']
wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 v_var**2))
wind_speed.to_netcdf(w_dir)
del u_wind
del v_wind
del u_wind_rechunk
del v_wind_rechunk
del u_var
del v_var
del wind_speed
gc.collect()
Комментарии:
1. С некоторыми устранениями неполадок я вижу улучшение производительности при использовании cluster.scale() вместо cluster.adapt().
Ответ №1:
Как бы то ни было, ваш код по-прежнему выглядит последовательным, а не параллельным, что, в частности wind_speed.to_netcdf(w_dir)
, сразу запускает вычисления. Приведенный ниже код может потребовать некоторой корректировки, но главное — распараллелить ваши операции:
def single_run(i):
# nothing is modified in the code below relative
u_dir = glob.glob('../u_wind/uwind_hourly_' str(i) '*.nc')
v_dir = glob.glob('../v_wind/vwind_hourly_' str(i) '*.nc')
w_dir = './wind/wind_hourly_' str(i) '-' str(i) '.nc'
u_wind = xr.open_dataset(u_dir[0])
v_wind = xr.open_dataset(v_dir[0])
u_wind_rechunk = u_wind.chunk({'time':720})
v_wind_rechunk = v_wind.chunk({'time':720})
u_var = u_wind_rechunk['UGRD_10m']
v_var = v_wind_rechunk['VGRD_10m']
wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2 v_var**2))
wind_speed.to_netcdf(w_dir)
del u_wind
del v_wind
del u_wind_rechunk
del v_wind_rechunk
del u_var
del v_var
del wind_speed
gc.collect()
# new parts
import dask
run_me = dask.compute([dask.delayed(single_run)(i) for i in range (1979,2019)])