在 Vectorbt 中添加滚动优化,具有 N 个月的回溯窗口

发布于 2025-01-11 05:51:41 字数 6475 浏览 0 评论 0原文

我正在为最大化夏普投资组合编写回溯测试代码。

在这里,我使用 PyPortfolioOpt 而不是 cvxpy 来计算权重,但是,我很困惑在哪里配置此优化的回溯期的月数,例如 aa 滚动 36 个月优化。

我相信回顾窗口应该在这里设置,但我不太确定如何设置。

def pre_segment_func_nb(c, find_weights_nb, history_len, ann_factor, num_tests, srb_sharpe):
    if history_len == -1:
        # Look back at the entire time period
        close = c.close[:c.i, c.from_col:c.to_col]
    else:
        # Look back at a fixed time period
        if c.i - history_len <= 0:
            return (np.full(c.group_len, np.nan),)  # insufficient data
        close = c.close[c.i - history_len:c.i, c.from_col:c.to_col]

这是完整的代码 FYR。

import os
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
import pytz
from numba import njit
import matplotlib.pyplot as plt
import seaborn as sns

from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from pypfopt import base_optimizer
from pypfopt import objective_functions
from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
from pypfopt import EfficientSemivariance
from pypfopt.expected_returns import mean_historical_return
from pypfopt.expected_returns import returns_from_prices

import vectorbt as vbt
from vectorbt.generic.nb import nanmean_nb
from vectorbt.portfolio.nb import order_nb, sort_call_seq_nb
from vectorbt.portfolio.enums import SizeType, Direction
from IPython.display import set_matplotlib_formats
%matplotlib inline
%config InlineBackend.figure_format='retina'

symbols = ['NVDA','AMD','TSLA','NET','JPM','AAPL']
start_date = datetime(2012, 1, 1, tzinfo=pytz.utc)
end_date = datetime.today().strftime('%Y-%m-%d')

num_tests = 1000000

vbt.settings.array_wrapper['freq'] = 'days'
vbt.settings.returns['year_freq'] = '252 days'
vbt.settings.portfolio['seed'] = 42
vbt.settings.portfolio.stats['incl_unrealized'] = True

yfdata = vbt.YFData.download(symbols, start=start_date, end=end_date)
ohlcv = yfdata.concat()
price = ohlcv['Close'].fillna(method='ffill')


srb_sharpe = np.full(price.shape[0], np.nan)

@njit
def pre_sim_func_nb(c, every_nth):
    # Define rebalancing days
    c.segment_mask[:, :] = False
    c.segment_mask[every_nth::every_nth, :] = True
    return ()

@njit
def find_weights_nb(c, price, num_tests):
    # Find optimal weights based on best Sharpe ratio
    returns = (price[1:] - price[:-1]) / price[:-1]
    returns = returns[1:, :]  # cannot compute np.cov with NaN
    mean = nanmean_nb(returns)
    cov = np.cov(returns, rowvar=False)  # masked arrays not supported by Numba (yet)
    best_sharpe_ratio = -np.inf
    #best_sharpe_ratio = -np.inf
    weights = np.full(c.group_len, np.nan, dtype=np.float_)
    
    for i in range(num_tests):
        # Generate weights
        w = np.random.random_sample(c.group_len)
        w = w / np.sum(w)
        
        # Compute annualized mean, covariance, and Sharpe ratio
        p_return = np.sum(mean * w) * ann_factor
        p_std = np.sqrt(np.dot(w.T, np.dot(cov, w))) * np.sqrt(ann_factor)
        sharpe_ratio = p_return / p_std
        if sharpe_ratio > best_sharpe_ratio:
            best_sharpe_ratio = sharpe_ratio
            weights = w
            
    return best_sharpe_ratio, weights

@njit
def pre_segment_func_nb(c, find_weights_nb, history_len, ann_factor, num_tests, srb_sharpe):
    if history_len == -1:
        # Look back at the entire time period
        close = c.close[:c.i, c.from_col:c.to_col]
    else:
        # Look back at a fixed time period
        if c.i - history_len <= 0:
            return (np.full(c.group_len, np.nan),)  # insufficient data
        close = c.close[c.i - history_len:c.i, c.from_col:c.to_col]

    # Find optimal weights
    best_sharpe_ratio, weights = find_weights_nb(c, close, num_tests)
    srb_sharpe[c.i] = best_sharpe_ratio

    # Update valuation price and reorder orders
    size_type = SizeType.TargetPercent
    direction = Direction.LongOnly
    order_value_out = np.empty(c.group_len, dtype=np.float_)
    for k in range(c.group_len):
        col = c.from_col + k
        c.last_val_price[col] = c.close[c.i, col]
    sort_call_seq_nb(c, weights, size_type, direction, order_value_out)

    return (weights,)

@njit
def order_func_nb(c, weights):
    col_i = c.call_seq_now[c.call_idx]
    return order_nb(
        weights[col_i], 
        c.close[c.i, c.col],
        size_type=SizeType.TargetPercent
    )

ann_factor = returns.vbt.returns.ann_factor


def pyopt_find_weights(sc, price, num_tests):  # no @njit decorator = it's a pure Python function
    price = pd.DataFrame(price, columns=symbols)
    avg_returns = expected_returns.mean_historical_return(price)
    cov_mat = risk_models.CovarianceShrinkage(price).ledoit_wolf()

    ef = EfficientFrontier(avg_returns, cov_mat, weight_bounds=(0,1))
    
    min_weight, max_weight = 0.05, 0.35
    constraints=[
            # {"type": "eq", "fun": lambda w: np.sum(w) - 1},  # sum to 1
            {"type": "ineq", "fun": lambda w: w - min_weight},  # greater than min_weight
            {"type": "ineq", "fun": lambda w: max_weight - w},  # less than max_weight
        ]
    
    weights = ef.nonconvex_objective(
        objective_functions.sharpe_ratio,
        objective_args=(avg_returns, cov_mat),
        weights_sum_to_one=True,
        constraints = constraints
    )
    
    clean_weights = ef.clean_weights()
    weights = np.array([clean_weights[symbol] for symbol in symbols])
    best_sharpe_ratio = base_optimizer.portfolio_performance(weights, avg_returns, cov_mat)[2]
    latest_prices = get_latest_prices(price)
    da = DiscreteAllocation(clean_weights, latest_prices, total_portfolio_value=25000)
    allocation, leftover = da.lp_portfolio(reinvest=True)
  
    w = pd.DataFrame(clean_weights, columns=clean_weights.keys(), index = [0])
    w.to_excel('w.xlsx')
    return best_sharpe_ratio, weights

pyopt_srb_sharpe = np.full(price.shape[0], np.nan)

pyopt_srb_pf = vbt.Portfolio.from_order_func(
    price,
    order_func_nb,
    pre_sim_func_nb=pre_sim_func_nb,
    pre_sim_args=(63,), #63 #84
    pre_segment_func_nb=pre_segment_func_nb.py_func,  # run pre_segment_func_nb as pure Python function
    pre_segment_args=(pyopt_find_weights, -1, ann_factor, num_tests, pyopt_srb_sharpe),
    cash_sharing=True, 
    group_by=True,
    use_numba=False  # run simulate_nb as pure Python function
)

非常感谢

I am working on a backtesting code for a Maximize Sharpe portfolio.

Here I am using PyPortfolioOpt instead of cvxpy here to compute the weight, however, I am confused in where to configure the number of months for lookback period for this optimization, e.g. a a rolling 36 months optimization.

I believe lookback window should be set here, but I am not quite sure how.

def pre_segment_func_nb(c, find_weights_nb, history_len, ann_factor, num_tests, srb_sharpe):
    if history_len == -1:
        # Look back at the entire time period
        close = c.close[:c.i, c.from_col:c.to_col]
    else:
        # Look back at a fixed time period
        if c.i - history_len <= 0:
            return (np.full(c.group_len, np.nan),)  # insufficient data
        close = c.close[c.i - history_len:c.i, c.from_col:c.to_col]

Here is the full code FYR.

import os
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
import pytz
from numba import njit
import matplotlib.pyplot as plt
import seaborn as sns

from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from pypfopt import base_optimizer
from pypfopt import objective_functions
from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
from pypfopt import EfficientSemivariance
from pypfopt.expected_returns import mean_historical_return
from pypfopt.expected_returns import returns_from_prices

import vectorbt as vbt
from vectorbt.generic.nb import nanmean_nb
from vectorbt.portfolio.nb import order_nb, sort_call_seq_nb
from vectorbt.portfolio.enums import SizeType, Direction
from IPython.display import set_matplotlib_formats
%matplotlib inline
%config InlineBackend.figure_format='retina'

symbols = ['NVDA','AMD','TSLA','NET','JPM','AAPL']
start_date = datetime(2012, 1, 1, tzinfo=pytz.utc)
end_date = datetime.today().strftime('%Y-%m-%d')

num_tests = 1000000

vbt.settings.array_wrapper['freq'] = 'days'
vbt.settings.returns['year_freq'] = '252 days'
vbt.settings.portfolio['seed'] = 42
vbt.settings.portfolio.stats['incl_unrealized'] = True

yfdata = vbt.YFData.download(symbols, start=start_date, end=end_date)
ohlcv = yfdata.concat()
price = ohlcv['Close'].fillna(method='ffill')


srb_sharpe = np.full(price.shape[0], np.nan)

@njit
def pre_sim_func_nb(c, every_nth):
    # Define rebalancing days
    c.segment_mask[:, :] = False
    c.segment_mask[every_nth::every_nth, :] = True
    return ()

@njit
def find_weights_nb(c, price, num_tests):
    # Find optimal weights based on best Sharpe ratio
    returns = (price[1:] - price[:-1]) / price[:-1]
    returns = returns[1:, :]  # cannot compute np.cov with NaN
    mean = nanmean_nb(returns)
    cov = np.cov(returns, rowvar=False)  # masked arrays not supported by Numba (yet)
    best_sharpe_ratio = -np.inf
    #best_sharpe_ratio = -np.inf
    weights = np.full(c.group_len, np.nan, dtype=np.float_)
    
    for i in range(num_tests):
        # Generate weights
        w = np.random.random_sample(c.group_len)
        w = w / np.sum(w)
        
        # Compute annualized mean, covariance, and Sharpe ratio
        p_return = np.sum(mean * w) * ann_factor
        p_std = np.sqrt(np.dot(w.T, np.dot(cov, w))) * np.sqrt(ann_factor)
        sharpe_ratio = p_return / p_std
        if sharpe_ratio > best_sharpe_ratio:
            best_sharpe_ratio = sharpe_ratio
            weights = w
            
    return best_sharpe_ratio, weights

@njit
def pre_segment_func_nb(c, find_weights_nb, history_len, ann_factor, num_tests, srb_sharpe):
    if history_len == -1:
        # Look back at the entire time period
        close = c.close[:c.i, c.from_col:c.to_col]
    else:
        # Look back at a fixed time period
        if c.i - history_len <= 0:
            return (np.full(c.group_len, np.nan),)  # insufficient data
        close = c.close[c.i - history_len:c.i, c.from_col:c.to_col]

    # Find optimal weights
    best_sharpe_ratio, weights = find_weights_nb(c, close, num_tests)
    srb_sharpe[c.i] = best_sharpe_ratio

    # Update valuation price and reorder orders
    size_type = SizeType.TargetPercent
    direction = Direction.LongOnly
    order_value_out = np.empty(c.group_len, dtype=np.float_)
    for k in range(c.group_len):
        col = c.from_col + k
        c.last_val_price[col] = c.close[c.i, col]
    sort_call_seq_nb(c, weights, size_type, direction, order_value_out)

    return (weights,)

@njit
def order_func_nb(c, weights):
    col_i = c.call_seq_now[c.call_idx]
    return order_nb(
        weights[col_i], 
        c.close[c.i, c.col],
        size_type=SizeType.TargetPercent
    )

ann_factor = returns.vbt.returns.ann_factor


def pyopt_find_weights(sc, price, num_tests):  # no @njit decorator = it's a pure Python function
    price = pd.DataFrame(price, columns=symbols)
    avg_returns = expected_returns.mean_historical_return(price)
    cov_mat = risk_models.CovarianceShrinkage(price).ledoit_wolf()

    ef = EfficientFrontier(avg_returns, cov_mat, weight_bounds=(0,1))
    
    min_weight, max_weight = 0.05, 0.35
    constraints=[
            # {"type": "eq", "fun": lambda w: np.sum(w) - 1},  # sum to 1
            {"type": "ineq", "fun": lambda w: w - min_weight},  # greater than min_weight
            {"type": "ineq", "fun": lambda w: max_weight - w},  # less than max_weight
        ]
    
    weights = ef.nonconvex_objective(
        objective_functions.sharpe_ratio,
        objective_args=(avg_returns, cov_mat),
        weights_sum_to_one=True,
        constraints = constraints
    )
    
    clean_weights = ef.clean_weights()
    weights = np.array([clean_weights[symbol] for symbol in symbols])
    best_sharpe_ratio = base_optimizer.portfolio_performance(weights, avg_returns, cov_mat)[2]
    latest_prices = get_latest_prices(price)
    da = DiscreteAllocation(clean_weights, latest_prices, total_portfolio_value=25000)
    allocation, leftover = da.lp_portfolio(reinvest=True)
  
    w = pd.DataFrame(clean_weights, columns=clean_weights.keys(), index = [0])
    w.to_excel('w.xlsx')
    return best_sharpe_ratio, weights

pyopt_srb_sharpe = np.full(price.shape[0], np.nan)

pyopt_srb_pf = vbt.Portfolio.from_order_func(
    price,
    order_func_nb,
    pre_sim_func_nb=pre_sim_func_nb,
    pre_sim_args=(63,), #63 #84
    pre_segment_func_nb=pre_segment_func_nb.py_func,  # run pre_segment_func_nb as pure Python function
    pre_segment_args=(pyopt_find_weights, -1, ann_factor, num_tests, pyopt_srb_sharpe),
    cash_sharing=True, 
    group_by=True,
    use_numba=False  # run simulate_nb as pure Python function
)

greatly appreciated

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文