#python #parallel-processing #time-series #arima #joblib
Вопрос:
Я пытаюсь изменить следующий код с помощью joblib, чтобы он мог работать параллельно на vast.ai арендованный многоядерный процессор.
# Create a differenced series
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return numpy.array(diff)
# invert differenced value
def inverse_difference(history, yhat, interval=1):
return yhat history[-interval]
# evaluate an ARIMA model for a given order (p,d,q) and return RMSE
def evaluate_arima_model(X, arima_order):
# prepare training dataset
X = X.astype('float32')
train_size = int(len(X) * 0.50)
train, test = X[0:train_size], X[train_size:]
history = [x for x in train]
# make predictions
predictions = list()
for t in range(len(test)):
# difference data
days_in_week = 7
diff = difference(history, days_in_week)
model = ARIMA(diff, order=arima_order)
model_fit = model.fit(trend='nc', disp=0)
yhat = model_fit.forecast()[0]
yhat = inverse_difference(history, yhat, days_in_week)
predictions.append(yhat)
history.append(test[t])
# calculate out of sample error
rmse = sqrt(mean_squared_error(test, predictions))
return rmse
# evaluate combinations of p, d and q values for an ARIMA model
def evaluate_models(dataset, p_values, d_values, q_values):
dataset = dataset.astype('float32')
best_score, best_cfg = float("inf"), None
for p in p_values:
for d in d_values:
for q in q_values:
order = (p,d,q)
try:
rmse = evaluate_arima_model(dataset, order)
if rmse < best_score:
best_score, best_cfg = rmse, order
print('ARIMA%s RMSE=%.3f' % (order,rmse))
except:
continue
print('Best ARIMA%s RMSE=%.3f' % (best_cfg, best_score))
# evaluate parameters
p_values = range(0, 9)
d_values = range(0, 3)
q_values = range(0, 7)
warnings.filterwarnings("ignore")
evaluate_models(series.values, p_values, d_values, q_values)
Я нашел образец кода, который очень похож на модель SARIMAX, которая не включает в себя пошаговую проверку, описанную выше. Вот второй пример SARIMAX, который я нашел
def evaluate_sarima_model(train, test, arima_order, seasonalOrder):
try:
# no need to calcuate if order as well as seasonal differencing is 0
if (arima_order[1] seasonalOrder[1])==0:
print(f"##### Skipped modelling with: {arima_order}, {seasonalOrder} --> Both d amp; D are zeroesn")
# return a high value of RMSE so that it sits at the bottom of the list when sorted
return 999999999, arima_order, seasonalOrder
y_hat = test.copy()
model = SARIMAX(train.Count, order=arima_order, seasonal_order=seasonalOrder)
model_fit = model.fit()
predict = model_fit.predict(start="2014-6-25", end="2014-9-25", dynamic=True)
y_hat['model_prediction']=predict
error = rmse(test.Count, y_hat.model_prediction)
print(f"> Model: {error}, {arima_order}, {seasonalOrder}n")
return error, arima_order, seasonalOrder
except Exception as e:
# in case of convergence errors, non-invertible errors, etc.
print(f"##### Skipped modelling with: {arima_order}, {seasonalOrder}n")
print(e)
return 999999999, arima_order, seasonalOrder
def evaluate_models_parallely(train, test, p_values, d_values, q_values, P_values, D_values, Q_values, m_values):
# utilize aall available cores using n_jobs = cpu_count()
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
scor = []
try:
# call our function in a parallel manner
tasks = (delayed(evaluate_sarima_model)(train, test, (p,d,q), (P,D,Q,m)) for m in m_values for Q in Q_values for D in D_values for P in P_values for q in q_values for d in d_values for p in p_values)
results = executor(tasks)
scor.append(results)
except Exception as e:
print('Fatal Error....')
print(e)
return scor
# specify the range of values we want ot try for the different hyperprameters
p_values = np.arange(0, 2)
d_values = np.arange(1, 2)
q_values = np.arange(1, 4)
P_values = np.arange(0, 2)
D_values = np.arange(1, 2)
Q_values = np.arange(0, 3)
m_values = np.arange(7, 8)
# total combinations being tried: 2*1*3*2*1*3*1 = 36
scor=evaluate_models_parallely(train, valid, p_values, d_values, q_values, P_values, D_values, Q_values, m_values)
scores=[]
for tup_list in scor:
for tup in tup_list:
scores.append(tup)
# sort the results on basis of RMSE scores (ascending)
scores.sort(key=lambda x: x[0])
print('nTop 5 SARIMA params with minimum RMSEs:n')
for x in scores[:5]:
print(f'RMSE={x[0]} order={x[1]} seasonal_order={x[2]}n')
print("DONE!")
My attempts to merge the two have failed and i am at a bit of a loss for next steps. Any insight would be highly appreciated. I hope i have included anything needed to help. From having had read through some examples of joblib i understand i need to wrap the nested loop in the first evaluate_models() function into a delayed function.
How can i wrap the nested loop within a joblib delayed function to run it in parallel?