Получение очень медленных итераций в цикле, выполняемых над массивом данных с использованием Xarray и Dask

#python #dask #netcdf #python-xarray

#python #dask #netcdf #python-xarray

Вопрос:

Я пытаюсь вычислить скорость ветра из компонентов u и v для данных за 1 год с часовым шагом и разрешением 0,1 x 0,1 градуса в общей сложности за 40 лет. Отдельные u и v netcdf-файлы за 1 год составляют около 5 ГБ каждый. Я реализовал базовый for цикл, в котором u и v файлов netcdf для каждого года открываются через Xarray open_dataset и повторно обрабатываются, чтобы получить их в виде массивов dask, после чего выполняются вычисления и экспортируется результат как новый netcdf. При запуске цикла первая итерация выполняется почти мгновенно, но затем цикл занимает слишком много времени для следующей итерации (почти до такой степени, что кажется, что он остановился). Я не понимаю, какая часть моего кода здесь является узким местом и почему. Любая помощь будет оценена. Кроме того, я правильно реализовал планировщик dask для адаптивного запроса ресурсов. Я прилагаю соответствующий фрагмент кода для справки :

     cluster = PBSCluster(cores=1,memory='8GB',queue='standard',project='civil',interface='ib0',walltime='00:20:00')
    cluster.adapt(minimum=1, maximum=8)
    client = Client(cluster) 
    for i in range (1979,2019):
        u_dir = glob.glob('../u_wind/uwind_hourly_'  str(i) '*.nc')
        v_dir = glob.glob('../v_wind/vwind_hourly_'  str(i) '*.nc')
        w_dir = './wind/wind_hourly_' str(i) '-' str(i) '.nc'
        u_wind = xr.open_dataset(u_dir[0])
        v_wind = xr.open_dataset(v_dir[0])
        u_wind_rechunk = u_wind.chunk({'time':720})
        v_wind_rechunk = v_wind.chunk({'time':720}) 
        u_var = u_wind_rechunk['UGRD_10m']
        v_var = v_wind_rechunk['VGRD_10m'] 
        wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
        wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2   v_var**2))
        wind_speed.to_netcdf(w_dir)
        del u_wind
        del v_wind
        del u_wind_rechunk
        del v_wind_rechunk
        del u_var
        del v_var
        del wind_speed
        gc.collect()

 

Комментарии:

1. С некоторыми устранениями неполадок я вижу улучшение производительности при использовании cluster.scale() вместо cluster.adapt().

Ответ №1:

Как бы то ни было, ваш код по-прежнему выглядит последовательным, а не параллельным, что, в частности wind_speed.to_netcdf(w_dir) , сразу запускает вычисления. Приведенный ниже код может потребовать некоторой корректировки, но главное — распараллелить ваши операции:

 def single_run(i):
# nothing is modified in the code below relative
    u_dir = glob.glob('../u_wind/uwind_hourly_'  str(i) '*.nc')
    v_dir = glob.glob('../v_wind/vwind_hourly_'  str(i) '*.nc')
    w_dir = './wind/wind_hourly_' str(i) '-' str(i) '.nc'
    u_wind = xr.open_dataset(u_dir[0])
    v_wind = xr.open_dataset(v_dir[0])
    u_wind_rechunk = u_wind.chunk({'time':720})
    v_wind_rechunk = v_wind.chunk({'time':720}) 
    u_var = u_wind_rechunk['UGRD_10m']
    v_var = v_wind_rechunk['VGRD_10m'] 
    wind_speed = xr.Dataset(data_vars=None, coords=None, attrs=None)
    wind_speed=wind_speed.assign(wind_speed=np.sqrt(u_var**2   v_var**2))
    wind_speed.to_netcdf(w_dir)
    del u_wind
    del v_wind
    del u_wind_rechunk
    del v_wind_rechunk
    del u_var
    del v_var
    del wind_speed
    gc.collect()

# new parts
import dask

run_me = dask.compute([dask.delayed(single_run)(i) for i in range (1979,2019)])