Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 99 additions & 107 deletions src/pyrb/allocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@ def n(self):
return self.__n

def __init__(self, cov, pi=None, x=None):
"""
Base class for Risk Budgeting Allocation.

Parameters
----------
cov : array, shape (n, n)
Covariance matrix of the returns.

pi : array, shape(n,)
Expected excess return for each asset (the default is None which implies 0 for each asset).

x : array, shape(n,)
Array of weights.
"""Base class for Risk Budgeting Allocation.

Args:
cov: Covariance matrix of the returns, shape (n, n).
pi: Expected excess return for each asset, shape (n,).
The default is None which implies 0 for each asset.
x: Array of weights, shape (n,).
"""
self.__n = cov.shape[0]
if x is None:
Expand All @@ -58,7 +51,10 @@ def __init__(self, cov, pi=None, x=None):

@abstractmethod
def solve(self):
"""Solve the problem."""
"""Solve the problem.

This is an abstract method that must be implemented by subclasses.
"""
pass

@abstractmethod
Expand All @@ -67,26 +63,38 @@ def get_risk_contributions(self):
pass

def get_variance(self):
"""Get the portfolio variance: x.T * cov * x."""
"""Get the portfolio variance: x.T * cov * x.

Returns:
Portfolio variance as a float.
"""
x = self.x
cov = self.cov
x = tools.to_column_matrix(x)
cov = np.matrix(cov)
RC = np.multiply(x, cov * x)
cov = np.asarray(cov)
RC = np.multiply(x, cov @ x)
return np.sum(tools.to_array(RC))

def get_volatility(self):
"""Get the portfolio volatility: x.T * cov * x."""
"""Get the portfolio volatility: sqrt(x.T * cov * x).

Returns:
Portfolio volatility as a float.
"""
return self.get_variance() ** 0.5

def get_expected_return(self):
"""Get the portfolio expected excess returns: x.T * pi."""
"""Get the portfolio expected excess returns: x.T * pi.

Returns:
Portfolio expected excess return as a float, or NaN if pi is None.
"""
if self.pi is None:
return np.nan
else:
x = self.x
x = tools.to_column_matrix(x)
return float(x.T * self.pi)
return float(x.T @ self.pi)

def __str__(self):
return (
Expand All @@ -100,20 +108,22 @@ def __str__(self):

class EqualRiskContribution(RiskBudgetAllocation):
def __init__(self, cov):
"""
Solve the equal risk contribution problem using cyclical coordinate descent. Although this does not change
the optimal solution, the risk measure considered is the portfolio volatility.
"""Solve the equal risk contribution problem using cyclical coordinate descent.

Parameters
----------
cov : array, shape (n, n)
Covariance matrix of the returns.
Although this does not change the optimal solution, the risk measure
considered is the portfolio volatility.

Args:
cov: Covariance matrix of the returns, shape (n, n).
"""

RiskBudgetAllocation.__init__(self, cov)

def solve(self):
"""Solve the equal risk contribution problem using cyclical coordinate descent.

Updates the internal weights (x) and lambda_star attributes.
"""
x = solve_rb_ccd(cov=self.cov)
self._x = tools.to_array(x / x.sum())
self.lambda_star = self.get_volatility()
Expand All @@ -122,33 +132,33 @@ def get_risk_contributions(self, scale=True):
x = self.x
cov = self.cov
x = tools.to_column_matrix(x)
cov = np.matrix(cov)
RC = np.multiply(x, cov * x) / self.get_volatility()
cov = np.asarray(cov)
RC = np.multiply(x, cov @ x) / self.get_volatility()
if scale:
RC = RC / RC.sum()
return tools.to_array(RC)


class RiskBudgeting(RiskBudgetAllocation):
def __init__(self, cov, budgets):
"""
Solve the risk budgeting problem using cyclical coordinate descent. Although this does not change
the optimal solution, the risk measure considered is the portfolio volatility.
"""Solve the risk budgeting problem using cyclical coordinate descent.

Parameters
----------
cov : array, shape (n, n)
Covariance matrix of the returns.

budgets : array, shape(n,)
Risk budgets for each asset (the default is None which implies equal risk budget).
Although this does not change the optimal solution, the risk measure
considered is the portfolio volatility.

Args:
cov: Covariance matrix of the returns, shape (n, n).
budgets: Risk budgets for each asset, shape (n,).
"""
RiskBudgetAllocation.__init__(self, cov=cov)
validation.check_risk_budget(budgets, self.n)
self.budgets = budgets

def solve(self):
"""Solve the risk budgeting problem using cyclical coordinate descent.

Updates the internal weights (x) and lambda_star attributes.
"""
x = solve_rb_ccd(cov=self.cov, budgets=self.budgets)
self._x = tools.to_array(x / x.sum())
self.lambda_star = self.get_volatility()
Expand All @@ -157,32 +167,27 @@ def get_risk_contributions(self, scale=True):
x = self.x
cov = self.cov
x = tools.to_column_matrix(x)
cov = np.matrix(cov)
RC = np.multiply(x, cov * x) / self.get_volatility()
cov = np.asarray(cov)
RC = np.multiply(x, cov @ x) / self.get_volatility()
if scale:
RC = RC / RC.sum()
return tools.to_array(RC)


class RiskBudgetingWithER(RiskBudgetAllocation):
def __init__(self, cov, budgets=None, pi=None, c=1):
"""
Solve the risk budgeting problem for the standard deviation risk measure using cyclical coordinate descent.
The risk measure is given by R(x) = c * sqrt(x^T cov x) - pi^T x.

Parameters
----------
cov : array, shape (n, n)
Covariance matrix of the returns.

budgets : array, shape(n,)
Risk budgets for each asset (the default is None which implies equal risk budget).

pi : array, shape(n,)
Expected excess return for each asset (the default is None which implies 0 for each asset).

c : float
Risk aversion parameter equals to one by default.
"""Solve the risk budgeting problem for the standard deviation risk measure.

Uses cyclical coordinate descent. The risk measure is given by
R(x) = c * sqrt(x^T cov x) - pi^T x.

Args:
cov: Covariance matrix of the returns, shape (n, n).
budgets: Risk budgets for each asset, shape (n,).
Default is None which implies equal risk budget.
pi: Expected excess return for each asset, shape (n,).
Default is None which implies 0 for each asset.
c: Risk aversion parameter, default is 1.
"""
RiskBudgetAllocation.__init__(self, cov=cov, pi=pi)
validation.check_risk_budget(budgets, self.n)
Expand All @@ -198,8 +203,8 @@ def get_risk_contributions(self, scale=True):
x = self.x
cov = self.cov
x = tools.to_column_matrix(x)
cov = np.matrix(cov)
RC = np.multiply(x, cov * x) / self.get_volatility() * self.c - self.x * self.pi
cov = np.asarray(cov)
RC = np.multiply(x, cov @ x) / self.get_volatility() * self.c - self.x * self.pi
if scale:
RC = RC / RC.sum()
return tools.to_array(RC)
Expand All @@ -223,38 +228,31 @@ def __init__(
bounds=None,
solver="admm_ccd",
):
"""
Solve the constrained risk budgeting problem. It supports linear inequality (Cx <= d) and bounds constraints.
Notations follow the paper Constrained Risk Budgeting Portfolios by Richard J-C. and Roncalli T. (2019).

Parameters
----------
cov : array, shape (n, n)
Covariance matrix of the returns.

budgets : array, shape (n,)
Risk budgets for each asset (the default is None which implies equal risk budget).

pi : array, shape (n,)
Expected excess return for each asset (the default is None which implies 0 for each asset).

c : float
Risk aversion parameter equals to one by default.

C : array, shape (p, n)
Array of p inequality constraints. If None the problem is unconstrained and solved using CCD
(algorithm 3) and it solves equation (17).

d : array, shape (p,)
Array of p constraints that matches the inequalities.

bounds : array, shape (n, 2)
Array of minimum and maximum bounds. If None the default bounds are [0,1].

solver : basestring
"admm_ccd" (default): generalized standard deviation-based risk measure + linear constraints. The algorithm is ADMM_CCD (algorithm 4) and it solves equation (14).
"admm_qp" : mean variance risk measure + linear constraints. The algorithm is ADMM_QP and it solves equation (15).

"""Solve the constrained risk budgeting problem.

Supports linear inequality (Cx <= d) and bounds constraints.
Notations follow the paper Constrained Risk Budgeting Portfolios
by Richard J-C. and Roncalli T. (2019).

Args:
cov: Covariance matrix of the returns, shape (n, n).
budgets: Risk budgets for each asset, shape (n,).
Default is None which implies equal risk budget.
pi: Expected excess return for each asset, shape (n,).
Default is None which implies 0 for each asset.
c: Risk aversion parameter, default is 1.
C: Array of p inequality constraints, shape (p, n). If None the
problem is unconstrained and solved using CCD (algorithm 3)
and it solves equation (17).
d: Array of p constraints that matches the inequalities, shape (p,).
bounds: Array of minimum and maximum bounds, shape (n, 2).
If None the default bounds are [0,1].
solver: Solver method, either "admm_ccd" (default) or "admm_qp".
"admm_ccd": generalized standard deviation-based risk measure +
linear constraints. The algorithm is ADMM_CCD (algorithm 4) and
it solves equation (14).
"admm_qp": mean variance risk measure + linear constraints.
The algorithm is ADMM_QP and it solves equation (15).
"""

RiskBudgetingWithER.__init__(self, cov=cov, budgets=budgets, pi=pi, c=c)
Expand Down Expand Up @@ -340,32 +338,26 @@ def solve(self):
logging.exception("Problem not solved: " + str(e))

def get_risk_contributions(self, scale=True):
"""
Return the risk contribution. If the solver is "admm_qp" the mean variance risk
measure is considered.

Parameters
----------
scale : bool
If True, the sum on risk contribution is scaled to one.
"""Return the risk contribution.

Returns
-------
If the solver is "admm_qp" the mean variance risk measure is considered.

RC : array, shape (n,)
Returns the risk contribution of each asset.
Args:
scale: If True, the sum on risk contribution is scaled to one.

Returns:
Risk contribution of each asset, shape (n,).
"""
x = self.x
cov = self.cov
x = tools.to_column_matrix(x)
cov = np.matrix(cov)
cov = np.asarray(cov)

if self.solver == "admm_qp":
RC = np.multiply(x, cov * x) - self.c * self.x * self.pi
RC = np.multiply(x, cov @ x) - self.c * self.x * self.pi
else:
RC = np.multiply(
x, cov * x
x, cov @ x
).T / self.get_volatility() * self.c - tools.to_array(
self.x.T
) * tools.to_array(self.pi)
Expand Down
Loading