#r #parallel-processing #quantstrat #mclapply
#r #параллельная обработка #quantstrat #mclapply
Вопрос:
Я бы хотел распараллелить quantstrat. Мой код не совсем такой, но это демонстрирует проблему. Проблема, я полагаю, в том, что .blotter env инициализируется адресом памяти указателя, и я не могу инициализировать массив / матрицу new.env().
Что я хотел бы сделать, так это заменить цикл for на mclapply, чтобы я мог запускать несколько стратегий приложений с различными датами / символами (здесь показаны только различные символы). Моя конечная цель — кластер beowulf (makeCluster), и я планирую запускать их параллельно, используя до 252 торговых дней (скользящее окно) с различными символами за итерацию (но мне все это не нужно. Я просто спрашиваю, есть ли способ работать с назначением portfolio и последующего объекта .blotter memory таким образом, чтобы я мог использовать mclapply)
#Load quantstrat in your R environment.
rm(list = ls())
local()
library(quantstrat)
library(parallel)
# The search command lists all attached packages.
search()
symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')
#for(i in 1:length(symbolstring1))
mlapply(symbolstring1, function(symbolstring)
{
#local()
#i=2
#symbolstring=as.character(symbolstring1[i])
.blotter <- new.env()
.strategy <- new.env()
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.
initDate <- '2010-01-01'
startDate <- '2011-01-01'
endDate <- '2019-08-10'
init_equity <- 50000
# Set UTC TIME
Sys.setenv(TZ="UTC")
getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
# Define names for portfolio, account and strategy.
#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)
print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important
#rm.strat(strategyName)
print("port")
initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate)
initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity)
initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate)
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 10), label = "nFast")
add.indicator(strategy = strategyName,
name = "EMA",
arguments = list(x = quote(Cl(mktdata)),
n = 30),
label = "nSlow")
# Add long signal when the fast EMA crosses over slow EMA.
print("sig")
add.signal(strategy = strategyName,
name="sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "gte"),
label = "longSignal")
# Add short signal when the fast EMA goes below slow EMA.
add.signal(strategy = strategyName,
name = "sigCrossover",
arguments = list(columns = c("nFast", "nSlow"),
relationship = "lt"),
label = "shortSignal")
# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)
print("rul")
add.rule(strategyName,
name= "ruleSignal",
arguments=list(sigcol="longSignal",
sigval=TRUE,
orderqty=100,
ordertype="market",
orderside="long",
replace = TRUE,
TxnFees = -10),
type="enter",
label="EnterLong")
# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = -100,
TxnFees = -10,
replace = TRUE),
type = "enter",
label = "EnterShort")
# Close long positions when the shortSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "shortSignal",
sigval = TRUE,
orderside = "long",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitLong")
# Close Short positions when the longSignal column is True
add.rule(strategyName,
name = "ruleSignal",
arguments = list(sigcol = "longSignal",
sigval = TRUE,
orderside = "short",
ordertype = "market",
orderqty = "all",
TxnFees = -10,
replace = TRUE),
type = "exit",
label = "ExitShort")
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName)
dateRange <- time(getPortfolio(portfolioName)$summary)[-1]
updateAcct(portfolioName,dateRange)
updateEndEq(accountName)
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))
#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)
}
)
Но выдается ошибка
Ошибка в get (символ, envir = envir): объект ‘QQQ’ не найден
В частности, проблема в FinancialInstrument :::.instrument указывает на адрес памяти, который не обновляется при вызовах моих инкапсулированных переменных (symbolstring)
Комментарии:
1. Не уверен, где и как происходит распараллеливание в вашем примере кода, если вы контролируете ситуацию, попробуйте с future.apply .
Ответ №1:
apply.paramset
в quantstrat
уже используется foreach
конструкция для распараллеливания выполнения applyStrategy
.
apply.paramset
необходимо проделать достаточный объем работы, чтобы убедиться, что рабочие среды доступны для выполнения работы, и собрать надлежащие результаты для отправки их обратно вызывающему процессу.
Самое простое, что вам, вероятно, можно было бы сделать, это использовать apply.paramset
. Задайте параметры даты и символов и обеспечьте нормальную работу функции.
В качестве альтернативы, я предлагаю вам посмотреть на шаги, необходимые для использования параллельной foreach
конструкции в apply.paramset
, чтобы изменить ее в соответствии с вашим предложенным случаем.
Также обратите внимание, что в вашем вопросе задается вопрос об использовании кластера Beowulf и mclapply
. Это не сработает. mclapply
работает только в одном пространстве памяти. Кластеры Beowulf обычно не используют одну память и пространство процесса. Обычно они распределяют задания через параллельные библиотеки, такие как MPI. apply.paramset
уже можно распространять в кластере Beowulf, используя doMPI
серверную часть для foreach
. Это одна из причин, по которой мы использовали foreach
: множество различных доступных параллельных серверных систем. doMC
Серверная часть для foreach
фактически использует mclapply
за кулисами.
Комментарии:
1. Это точно не решает проблему. Я смог заставить mclapply работать (и собираюсь ответить на свой собственный вопрос). Я уточнил, что мой вопрос был не «как сделать beowulf». Просто это конечная цель. В настоящее время я хотел бы заставить mclapply работать. Пока перемещение стандартных объектов за пределы цикла, похоже, работает. Правильной функцией для использования с makeCluster является clusterApply[LB]. Что касается paramset. Насколько я понимаю, это изменяет определенные параметры в сигналах, не обязательно запасы и даты, но я могу ошибаться. Если в вашем ответе приведен пример использования приведенного выше кода с использованием paramset, я мог бы пометить его как «отвеченный»
2. Я беру свои слова обратно. Помещение стандартных объектов за пределы mclapply создает проблему с датой. Quantstrat загружает все даты, которые есть в объекте. Единственный способ выполнить итерацию по разным датам — это удалить объект и повторно заполнить его другими датами. Итак, проблема, похоже, в общей памяти имен объектов
3. Спасибо за ответ. Я собираюсь переварить это: «Вероятно, самым простым для вас было бы использовать apply.paramset . Задайте параметры даты и символов и обеспечьте нормальную работу функции.»
4. Мои извинения, Брайан. Я совершенно не знал, что разговариваю с автором инструмента, о котором задавал вопросы. Я все еще пытаюсь разобраться, как использовать paramset
5. Я вижу mclapply и любой пакет mc доступен только для чтения и не может обновлять переменные общей памяти, что является частью кривой обучения, с которой я имею дело.
Ответ №2:
Я считаю, что это распараллеливает код. Я заменил индикаторы так же, как и символы, но логика использования разных символов и дат присутствует
В основном я добавил
Dates=paste0(startDate,"::",endDate)
rm(list = ls())
library(lubridate)
library(parallel)
autoregressor1 = function(x){
if(NROW(x)<12){ result = NA} else{
y = Vo(x)*Ad(x)
#y = ROC(Ad(x))
y = ROC(y)
y = na.omit(y)
step1 = ar.yw(y)
step2 = predict(step1,newdata=y,n.ahead=1)
step3 = step2$pred[1] 1
step4 = (step3*last(Ad(x))) - last(Ad(x))
result = step4
}
return(result)
}
autoregressor = function(x){
ans = rollapply(x,26,FUN = autoregressor1,by.column=FALSE)
return (ans)}
########################indicators#############################
library(quantstrat)
library(future.apply)
library(scorecard)
reset_quantstrat <- function() {
if (! exists(".strategy")) .strategy <<- new.env(parent = .GlobalEnv)
if (! exists(".blotter")) .blotter <<- new.env(parent = .GlobalEnv)
if (! exists(".audit")) .audit <<- new.env(parent = .GlobalEnv)
suppressWarnings(rm(list = ls(.strategy), pos = .strategy))
suppressWarnings(rm(list = ls(.blotter), pos = .blotter))
suppressWarnings(rm(list = ls(.audit), pos = .audit))
FinancialInstrument::currency("USD")
}
reset_quantstrat()
initDate <- '2010-01-01'
endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(3)
symbolstring1 <- c('SSO','GOLD')
getSymbols(symbolstring1,from=startDate,to=endDate,adjust=TRUE,src='yahoo')
#symbolstring1 <- c('SP500TR','GOOG')
.orderqty <- 1
.txnfees <- 0
#random <- sample(1:2, 2, replace=FALSE)
random <- (1:2)
equity <- lapply(random, function(x)
{#x=1
try(rm("account.Snazzy","portfolio.Snazzy",pos=.GlobalEnv$.blotter),silent=TRUE)
rm(.blotter)
rm(.strategy)
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",x 2)
#endDate <- as.Date(Sys.Date())
startDate <- endDate %m-% years(1 x)
#Load quantstrat in your R environment.
reset_quantstrat()
# The search command lists all attached packages.
search()
symbolstring=as.character(symbolstring1[x])
print(symbolstring)
try(rm.strat(strategyName),silent=TRUE)
try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
print(symbolstring)
currency('USD')
stock(symbolstring,currency='USD',multiplier=1)
# Currency and trading instrument objects stored in the
# .instrument environment
print("FI")
ls(envir=FinancialInstrument:::.instrument)
# blotter functions used for instrument initialization
# quantstrat creates a private storage area called .strategy
ls(all=T)
init_equity <- 10000
Sys.setenv(TZ="UTC")
print(portfolioName)
print("port")
try(initPortf(name = portfolioName,
symbols = symbolstring,
initDate = initDate))
try(initAcct(name = accountName,
portfolios = portfolioName,
initDate = initDate,
initEq = init_equity))
try(initOrders(portfolio = portfolioName,
symbols = symbolstring,
initDate = initDate))
# name: the string name of the strategy
# assets: optional list of assets to apply the strategy to.
# Normally these are defined in the portfolio object
# contstrains: optional portfolio constraints
# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)
ls(all=T)
# .blotter holds the portfolio and account object
ls(.blotter)
# .strategy holds the orderbook and strategy object
print(ls(.strategy))
print("ind")
#ARIMA
add.indicator(
strategy = strategyName,
name = "autoregressor",
arguments = list(
x = quote(mktdata)),
label = "arspread")
################################################ Signals #############################
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.25,
column = "arspread",
relationship = "gte",
cross = TRUE),
label = "Selltime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = 0.1,
column = "arspread",
relationship = "lt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.1,
column = "arspread",
relationship = "gt",
cross = TRUE),
label = "cashtime")
add.signal(
strategy = strategyName,
name = "sigThreshold",
arguments = list(
threshold = -0.25,
column = "arspread",
relationship = "lte",
cross = TRUE),
label = "Buytime")
######################################## Rules #################################################
#Entry Rule Long
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Buytime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "long",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Entry Rule Short
add.rule(strategyName,
name = "ruleSignal",
arguments = list(
sigcol = "Selltime",
sigval = TRUE,
orderqty = .orderqty,
ordertype = "market",
orderside = "short",
pricemethod = "market",
replace = TRUE,
TxnFees = -.txnfees
#,
#osFUN = osMaxPos
),
type = "enter",
path.dep = TRUE,
label = "Entry")
#Exit Rules
print("summary")
summary(getStrategy(strategyName))
# Summary results are produced below
print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName)
# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below
getTxns(Portfolio=portfolioName, Symbol=symbolstring)
mktdata
updatePortf(portfolioName,Dates=paste0(startDate,"::",endDate))
dateRange <- time(getPortfolio(portfolioName)$summary)
updateAcct(portfolioName,dateRange[which(dateRange >= startDate amp; dateRange <= endDate)])
updateEndEq(accountName, Dates=paste0(startDate,"::",endDate))
print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = symbolstring))
tStats <- tradeStats(Portfolios = portfolioName, use="trades", inclZeroDays=FALSE,Dates=paste0(startDate,"::",endDate))
final_acct <- getAccount(portfolioName)
#final_acct
#View(final_acct)
options(width=70)
print(plot(tail(final_acct$summary$End.Eq,-1), main = symbolstring))
#dev.off()
tail(final_acct$summary$End.Eq)
rets <- PortfReturns(Account = accountName)
#rownames(rets) <- NULL
tab.perf <- table.Arbitrary(rets,
metrics=c(
"Return.cumulative",
"Return.annualized",
"SharpeRatio.annualized",
"CalmarRatio"),
metricsNames=c(
"Cumulative Return",
"Annualized Return",
"Annualized Sharpe Ratio",
"Calmar Ratio"))
tab.perf
tab.risk <- table.Arbitrary(rets,
metrics=c(
"StdDev.annualized",
"maxDrawdown"
),
metricsNames=c(
"Annualized StdDev",
"Max DrawDown"))
tab.risk
return (as.numeric(tail(final_acct$summary$End.Eq,1))-init_equity)
#reset_quantstrat()
}
)
похоже, что он распараллелен, но он не обновляет init_equity корректно