#python #parallel-processing #dask
Вопрос:
Я пытаюсь реализовать приведенный ниже код для параллельного запуска, но он, похоже, работает довольно медленно. Был бы признателен, если бы были какие-либо отзывы. Похоже, у него столько же времени, сколько и у обычного цикла
import pandas as pd
import numpy as np
import dask
import statsmodels.api as sm
dates = pd.date_range('1995-12-31', periods=480, freq='W', name='Date')
df = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df["CUSIP"] = "20"
df2 = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df2["CUSIP"] = "21"
weekly = pd.concat([df2,df])
def olsreg_beta(yvar, xvars):
Ygrp = yvar
Xgrp = sm.add_constant(xvars)
reg = sm.OLS(Ygrp, Xgrp).fit()
return reg.params[1]
beta = pd.DataFrame(columns=["date", "CUSIP", "beta"])
for cusip in weekly.CUSIP.unique().tolist():
length = len(weekly[weekly.CUSIP == cusip])
if length < 156:
continue
for i in range(length 1):
yvar = weekly[weekly.CUSIP == cusip].iloc[i:156 i]["ret"].values
xvar = weekly[weekly.CUSIP == cusip].iloc[i:156 i]["mkt_ret"].values
beta_param = dask.delayed(olsreg_beta)(yvar, xvar)
date = weekly[weekly.CUSIP == cusip].iloc[156 i, 1]
beta.append({"date": date, "CUSIP": cusip, "beta": beta_param}, ignore_index=True)
print(i)
results = dask.compute(*beta)
Комментарии:
1. хорошо, почему бы вам не использовать многопоточность для параллельного выполнения
Ответ №1:
Вы пересчитываете много мыслей несколько раз без всяких причин (например weekly.CUSIP == cusip
) и используете очень медленное приложение Pandas, которое создает новый фрейм данных для каждой новой строки. Лучше использовать groupby
Панд и перебирать группы, чтобы избежать квадратичной сложности. Вот более быстрый (непроверенный) код:
import pandas as pd
import numpy as np
import dask
import statsmodels.api as sm
dates = pd.date_range('1995-12-31', periods=480, freq='W', name='Date')
df = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df["CUSIP"] = "20"
df2 = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df2["CUSIP"] = "21"
weekly = pd.concat([df2,df])
def olsreg_beta(yvar, xvars):
Ygrp = yvar
Xgrp = sm.add_constant(xvars)
reg = sm.OLS(Ygrp, Xgrp).fit()
return reg.params[1]
betaLines = []
for cusip, filteredWeeks in weekly.groupby("CUSIP"):
length = len(filteredWeeks)
if length < 156:
continue
for i in range(length 1):
yvar = filteredWeeks.iloc[i:156 i]["ret"].values
xvar = filteredWeeks.iloc[i:156 i]["mkt_ret"].values
beta_param = dask.delayed(olsreg_beta)(yvar, xvar)
date = filteredWeeks.iloc[156 i, 1]
betaLines.append({"date": date, "CUSIP": cusip, "beta": "beta_param"})
print(i)
beta = pd.DataFrame(betaLines)
results = dask.compute(*beta)