Как использовать xarray apply_ufunc для вычисления алгоритма для пикселя с несколькими входами

#python #numpy #python-xarray

Вопрос:

Я пытаюсь выполнить анализ драйверов для нескольких переменных xarray в форме:

введите описание изображения здесь

На самом деле это очень большой набор данных dask. Я хочу знать, сколько отклонений фиксируется каждой из этих переменных во времени в каждом пикселе. Я нашел библиотеку Kruskals (https://github.com/Rambatino/Kruskals ), который выполняет анализ этого ключевого драйвера, но только для массива numpy или фрейма данных pandas.

Я могу вычислить один пиксель, используя этот метод, и подумал, что он должен быть достаточно простым для вычисления каждого пикселя в xarray apply_ufunc , но у меня возникли проблемы, с которыми вы могли бы мне помочь

 # Load example Imports
from Kruskals import Kruskals #https://github.com/Rambatino/Kruskals
import numpy as np
import xarray as xr

# Make Fake Data
times = pd.date_range('2019-01-01', '2019-01-10', name='time')
y=np.arange(0,8)
x=np.arange(0,8)
data1 = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times,'x':x,'y':y})
data2 = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times,'x':x,'y':y})
data3 = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times,'x':x,'y':y})
data4 = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times,'x':x,'y':y})
data5 = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times,'x':x,'y':y})
data1.name='data1'
data2.name='data2'
data3.name='data3'
data4.name='data4'
data5.name='data5'
data=xr.merge([data1,data2,data3,data4,data5])
data = data.chunk({'time':10, 'x':1, 'y':1})


# This method works for a single pixel
cutout=data.sel(x=2,y=2)
cutout_pd=cutout.to_pandas().drop(['x','y'],axis=1)
#labels
cols=list(cutout_pd)
y=cols[0] # To tell Kruskals which columns to use. Can just just Kruskals() direct with np arrays instead though.
x=cols[1:] 

kru=Kruskals.from_pandas_df(cutout_pd, x, y).driver_score_to_series()
print(kru)

# But have tried a few hours and can't work out how to vectorise this

def test_apply_kruskals(a,b,c,d,e):
    drivers=Kruskals(a,[b,c,d,e])
    drivers_series=drivers.driver_score_to_series()
    return drivers_series



dr=xr.apply_ufunc(test_apply_kruskals,
                  data['data1'], # Independent Variable
                  data['data2'], # Depends Vars
                  data['data3'], # Depends Vars
                  data['data4'], # Depends Vars
                  data['data5'], # Depends Vars
                  input_core_dims=[["time"],["time"],["time"],["time"],["time"]],dask='allowed',vectorize=True,output_core_dims=[['drivers']])
# Throws TypeError: only integer scalar arrays can be converted to a scalar index

 

Я буквально просто хочу apply_ufunc, но только вдоль оси времени. Я так запутался со всеми входными, выходными, исключающими и основными измерениями здесь.

Редактировать: я думаю dask='parallelized' , что исправлена часть этой проблемы. это кажется близким решением, но у меня все еще возникают серьезные проблемы с памятью в большом наборе данных dask.

 # I think this solution is close but it still gets killed workers.

def test_apply_kruskals(a,b,c,d,e):
    #print(np.array([b,c,d,e]))
    print(a.shape)
    drivers=Kruskals(np.array([b,c,d,e]),np.array(a))
    
    driver_score=drivers.driver_score(directional=True)
    return driver_score

dr=xr.apply_ufunc(test_apply_kruskals,
                  data['data1'], # Independent Variable
                  data['data2'], # Depends Vars
                  data['data3'], # Depends Vars
                  data['data4'], # Depends Vars
                  data['data5'], # Depends Vars
                  input_core_dims=[["time"],["time"],["time"],["time"],["time"]],
                  dask='parallelized',
                  vectorize=True,
                  output_core_dims=[['drivers']],
                  output_dtypes=['float64'],
                  dask_gufunc_kwargs={'output_sizes':{"drivers": 4}})
 

Большое спасибо за вашу помощь!

Комментарии:

1. Что касается проблемы с памятью, вы можете ознакомиться с dask.distributed scheduler . Это позволяет вам ограничить объем памяти, который Dask должен приблизительно использовать. Это предотвращает гибель рабочих. см. examples.dask.org/xarray.html