Медленный параллельный запуск с использованием Dask для цикла

#python #parallel-processing #dask

Вопрос:

Я пытаюсь реализовать приведенный ниже код для параллельного запуска, но он, похоже, работает довольно медленно. Был бы признателен, если бы были какие-либо отзывы. Похоже, у него столько же времени, сколько и у обычного цикла

 import pandas as pd
import numpy as np
import dask
import statsmodels.api as sm

dates = pd.date_range('1995-12-31', periods=480, freq='W', name='Date')
df = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df["CUSIP"] = "20"
df2 = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df2["CUSIP"] = "21"
weekly = pd.concat([df2,df])

def olsreg_beta(yvar, xvars):
    Ygrp = yvar
    Xgrp = sm.add_constant(xvars)
    reg = sm.OLS(Ygrp, Xgrp).fit()
    return reg.params[1]

beta = pd.DataFrame(columns=["date", "CUSIP", "beta"])

for cusip in weekly.CUSIP.unique().tolist():
    length = len(weekly[weekly.CUSIP == cusip])
    if  length < 156:
        continue
    for i in range(length 1):
        yvar = weekly[weekly.CUSIP == cusip].iloc[i:156 i]["ret"].values
        xvar = weekly[weekly.CUSIP == cusip].iloc[i:156 i]["mkt_ret"].values
        beta_param = dask.delayed(olsreg_beta)(yvar, xvar)
        date = weekly[weekly.CUSIP == cusip].iloc[156 i, 1]
        beta.append({"date": date, "CUSIP": cusip, "beta": beta_param}, ignore_index=True)
        print(i)

results = dask.compute(*beta)
 

Комментарии:

1. хорошо, почему бы вам не использовать многопоточность для параллельного выполнения

Ответ №1:

Вы пересчитываете много мыслей несколько раз без всяких причин (например weekly.CUSIP == cusip ) и используете очень медленное приложение Pandas, которое создает новый фрейм данных для каждой новой строки. Лучше использовать groupby Панд и перебирать группы, чтобы избежать квадратичной сложности. Вот более быстрый (непроверенный) код:

 import pandas as pd
import numpy as np
import dask
import statsmodels.api as sm

dates = pd.date_range('1995-12-31', periods=480, freq='W', name='Date')
df = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df["CUSIP"] = "20"
df2 = pd.DataFrame(np.random.rand(480, 2), dates, ["ret", "mkt_ret"])
df2["CUSIP"] = "21"
weekly = pd.concat([df2,df])

def olsreg_beta(yvar, xvars):
    Ygrp = yvar
    Xgrp = sm.add_constant(xvars)
    reg = sm.OLS(Ygrp, Xgrp).fit()
    return reg.params[1]

betaLines = []

for cusip, filteredWeeks in weekly.groupby("CUSIP"):
    length = len(filteredWeeks)
    if  length < 156:
        continue
    for i in range(length 1):
        yvar = filteredWeeks.iloc[i:156 i]["ret"].values
        xvar = filteredWeeks.iloc[i:156 i]["mkt_ret"].values
        beta_param = dask.delayed(olsreg_beta)(yvar, xvar)
        date = filteredWeeks.iloc[156 i, 1]
        betaLines.append({"date": date, "CUSIP": cusip, "beta": "beta_param"})
        print(i)

beta = pd.DataFrame(betaLines)

results = dask.compute(*beta)