Параллельная реализация Joblib для python поиска по сетке ARIMA

#python #parallel-processing #time-series #arima #joblib

Вопрос:

Я пытаюсь изменить следующий код с помощью joblib, чтобы он мог работать параллельно на vast.ai арендованный многоядерный процессор.

 # Create a differenced series
def difference(dataset, interval=1):
    diff = list()
    for i in range(interval, len(dataset)):
        value = dataset[i] - dataset[i - interval]
        diff.append(value)
    return numpy.array(diff)

# invert differenced value
def inverse_difference(history, yhat, interval=1):
    return yhat   history[-interval]

# evaluate an ARIMA model for a given order (p,d,q) and return RMSE
def evaluate_arima_model(X, arima_order):
    
    # prepare training dataset
    X = X.astype('float32')
    train_size = int(len(X) * 0.50)
    train, test = X[0:train_size], X[train_size:]
    history = [x for x in train]

    # make predictions
    predictions = list()
    for t in range(len(test)):
    
        # difference data
        days_in_week = 7
        diff = difference(history, days_in_week)
        model = ARIMA(diff, order=arima_order)
        model_fit = model.fit(trend='nc', disp=0)
        yhat = model_fit.forecast()[0]
        yhat = inverse_difference(history, yhat, days_in_week)
        predictions.append(yhat)
        history.append(test[t])

    # calculate out of sample error
    rmse = sqrt(mean_squared_error(test, predictions))
    return rmse

# evaluate combinations of p, d and q values for an ARIMA model
def evaluate_models(dataset, p_values, d_values, q_values):
    dataset = dataset.astype('float32')
    best_score, best_cfg = float("inf"), None
    for p in p_values:
        for d in d_values:
            for q in q_values:
                order = (p,d,q)
                try:
                    rmse = evaluate_arima_model(dataset, order)
                    if rmse < best_score:
                        best_score, best_cfg = rmse, order
                    print('ARIMA%s RMSE=%.3f' % (order,rmse))
                except:
                    continue
    print('Best ARIMA%s RMSE=%.3f' % (best_cfg, best_score))

# evaluate parameters
p_values = range(0, 9)
d_values = range(0, 3)
q_values = range(0, 7)
warnings.filterwarnings("ignore")
evaluate_models(series.values, p_values, d_values, q_values)
 

Я нашел образец кода, который очень похож на модель SARIMAX, которая не включает в себя пошаговую проверку, описанную выше. Вот второй пример SARIMAX, который я нашел

 def evaluate_sarima_model(train, test, arima_order, seasonalOrder):
    try:
        # no need to calcuate if order as well as seasonal differencing is 0
        if (arima_order[1] seasonalOrder[1])==0:
          print(f"##### Skipped modelling with: {arima_order}, {seasonalOrder} --> Both d amp; D are zeroesn")
          # return a high value of RMSE so that it sits at the bottom of the list when sorted
          return 999999999, arima_order, seasonalOrder
          
        y_hat = test.copy() 
        model = SARIMAX(train.Count, order=arima_order, seasonal_order=seasonalOrder)
        model_fit = model.fit()
        predict = model_fit.predict(start="2014-6-25", end="2014-9-25", dynamic=True)
        y_hat['model_prediction']=predict

        error = rmse(test.Count, y_hat.model_prediction)
        print(f"> Model: {error}, {arima_order}, {seasonalOrder}n")
        return error, arima_order, seasonalOrder
    except Exception as e:
        # in case of convergence errors, non-invertible errors, etc.
        print(f"##### Skipped modelling with: {arima_order}, {seasonalOrder}n")
        print(e)
        return 999999999, arima_order, seasonalOrder

def evaluate_models_parallely(train, test, p_values, d_values, q_values, P_values, D_values, Q_values, m_values):
    # utilize aall available cores using n_jobs = cpu_count()
    executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing') 

    scor = []
    try:
      # call our function in a parallel manner
      tasks = (delayed(evaluate_sarima_model)(train, test, (p,d,q), (P,D,Q,m)) for m in m_values for Q in Q_values for D in D_values for P in P_values for q in q_values for d in d_values for p in p_values)  
      results = executor(tasks)

      scor.append(results)
    except Exception as e:
      print('Fatal Error....')
      print(e)

    return scor


# specify the range of values we want ot try for the different hyperprameters
p_values = np.arange(0, 2)
d_values = np.arange(1, 2)
q_values = np.arange(1, 4)
P_values = np.arange(0, 2)
D_values = np.arange(1, 2)
Q_values = np.arange(0, 3)
m_values = np.arange(7, 8)

# total combinations being tried: 2*1*3*2*1*3*1 = 36

scor=evaluate_models_parallely(train, valid, p_values, d_values, q_values, P_values, D_values, Q_values, m_values)

scores=[]
for tup_list in scor:
    for tup in tup_list:
        scores.append(tup)

# sort the results on basis of RMSE scores (ascending)
scores.sort(key=lambda x: x[0])

print('nTop 5 SARIMA params with minimum RMSEs:n')
for x in scores[:5]:
  print(f'RMSE={x[0]}  order={x[1]}  seasonal_order={x[2]}n')

print("DONE!")
 

My attempts to merge the two have failed and i am at a bit of a loss for next steps. Any insight would be highly appreciated. I hope i have included anything needed to help. From having had read through some examples of joblib i understand i need to wrap the nested loop in the first evaluate_models() function into a delayed function.

How can i wrap the nested loop within a joblib delayed function to run it in parallel?