Created
February 22, 2021 15:44
-
-
Save robcarver17/d972941047fcf4cc015fa5eeeafb3127 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib | |
matplotlib.use("TkAgg") | |
import matplotlib.pyplot as plt | |
from scipy.optimize import minimize | |
import numpy as np | |
import scipy.stats as stats | |
from scipy.stats import norm | |
from collections import namedtuple | |
from syscore.optimisation_utils import optimise, sigma_from_corr_and_std | |
from syscore.correlations import get_avg_corr, boring_corr_matrix | |
FUDGE_FACTOR_FOR_CORR_WEIGHT_UNCERTAINTY = 4.0 | |
### Following is the optimisation code: | |
### First the main function. | |
def optimise_with_sigma(sigma, mean_list): | |
""" | |
Given mean and covariance matrix, find portfolio weights that maximise SR | |
We assume we can have as much leverage as we like to hit a given risk target, hence we can | |
constrain weights to sum to 1 i.e. no cash option | |
:param sigma: np.array NxN, covariance matrix | |
:param mean_list: list of floats N in length, means | |
:return: list of weights | |
""" | |
mus = np.array(mean_list, ndmin=2).transpose() | |
number_assets = sigma.shape[1] | |
# Starting weights, equal weighting | |
start_weights = [1.0 / number_assets] * number_assets | |
# Set up constraints - positive weights, adding to 1.0 | |
bounds = [(0.0, 1.0)] * number_assets | |
cdict = [{'type': 'eq', 'fun': addem}] | |
ans = minimize( | |
neg_sharpe_ratio, | |
start_weights, (sigma, mus), | |
method='SLSQP', | |
bounds=bounds, | |
constraints=cdict, | |
tol=0.00001) | |
weights = ans['x'] | |
return weights | |
## This is the function we optimise over | |
def neg_sharpe_ratio(weights, sigma, mus): | |
""" | |
Returns minus return /risk for a portfolio | |
:param weights: list of floats N in length | |
:param sigma: np.array NxN, covariance matrix | |
:param mus: np.array Nx1 in length, means | |
:return: float | |
""" | |
weights = np.matrix(weights) | |
estreturn = (weights * mus)[0, 0] | |
std_dev = (variance(weights, sigma)**.5) | |
# Returns minus the portfolio Sharpe Ratio (as we're minimising) | |
return -estreturn / std_dev | |
## Portfolio standard deviation and variance given weights and covariance matrix sigma | |
def portfolio_stdev(weights, sigma): | |
std_dev = (variance(weights, sigma)**.5) | |
return std_dev | |
def variance(weights, sigma): | |
# returns the variance (NOT standard deviation) given weights and sigma | |
return (weights * sigma * weights.transpose())[0, 0] | |
def addem(weights): | |
# Used for constraints, weights must sum to 1 | |
return 1.0-sum(weights) | |
# Useful for working in standard deviation and correlation space | |
# rather than covariance space | |
def optimise_with_corr_and_std( mean_list,stdev_list, corrmatrix): | |
""" | |
Given mean, standard deviation and correlation matrix, find portfolio weights that maximise SR | |
:param mean_list: list of N floats | |
:param stdev_list: list of N floats | |
:param corrmatrix: NxN np.array | |
:return: list of floats | |
""" | |
sigma = sigma_from_corr_and_std(stdev_list, corrmatrix) | |
weights = optimise_with_sigma(sigma, mean_list) | |
return weights | |
### Do all the examples in the slides | |
optimise_with_corr_and_std([0.04,0.04,0.04], [.08,.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
optimise_with_corr_and_std( [0.04,0.04,0.04], [.08,.08,0.08],np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std( [0.04,0.04,0.04],[.08,.08,0.08], np.array([[1,0.7,0.0],[0.7,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.12], np.array([[1,0.0,0.0],[0.0,1,0.0],[0.0,0.0,1]])) | |
optimise_with_corr_and_std([.04,.04,0.04], [0.08,0.08,0.085], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
optimise_with_corr_and_std([.04,.04,0.045], [0.08,0.08,0.08], np.array([[1,.9,.9],[.9,1,.9],[.9,.9,1]])) | |
""" This code is used by me to get the data from my trading system. You can ignore it | |
from systems.provided.futures_chapter15.basesystem import futures_system | |
system = futures_system() | |
data=dict([(code,system.rawdata.get_percentage_returns(code)) for code in ["SP500", "US10", "US5"]]) | |
import pandas as pd | |
data = pd.concat(data, axis=1) | |
data = data.resample("7D").sum() | |
data = data[pd.datetime(1998,1,1):] | |
""" | |
import pandas as pd | |
data = pd.read_csv("returns.csv") | |
data.index = data['index'] | |
del(data['index']) | |
WEEKS_IN_YEAR = 365.25/7 | |
## Some functions to analyse data and return various statistics | |
## Assumes weekly returns | |
def annual_returns_from_weekly_data(data): | |
return data.mean()*WEEKS_IN_YEAR | |
def annual_stdev_from_weekly_data(data): | |
return data.std()*(WEEKS_IN_YEAR**.5) | |
def sharpe_ratio(data): | |
SR = annual_returns_from_weekly_data(data)/annual_stdev_from_weekly_data(data) | |
return SR | |
def optimise_with_data(data): | |
""" | |
Given some data, find the optimal SR portfolio | |
:param data: np.DataFrame containing N columns | |
:return: list of N floats | |
""" | |
mean_list = annual_returns_from_weekly_data(data).values | |
stdev_list = annual_stdev_from_weekly_data(data).values | |
corrmatrix = data.corr().values | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix) | |
return weights | |
## Bootstrapping code | |
import random | |
## Do optimisation with some random data | |
def optimisation_with_random_bootstrap(data): | |
bootstrapped_data = get_bootstrap_series(data) | |
weights = optimise_with_data(bootstrapped_data) | |
return weights | |
## Get some random data | |
def get_bootstrap_series(data): | |
length_of_series = len(data.index) | |
random_indices = [int(random.uniform(0,length_of_series)) for _unused in range(length_of_series)] | |
bootstrap_data = data.iloc[random_indices] | |
return bootstrap_data | |
def bootstrap_optimisation_distributions(data, monte_count=1000): | |
""" | |
Bootstrap over data, returning distribution of portfolio weights | |
:param data: np.DataFrame containing N columns | |
:param monte_count: int, how many bootstraps to do | |
:return: list of N floats | |
""" | |
dist_of_weights = [] | |
for i in range(monte_count): | |
single_bootstrap_weights = optimisation_with_random_bootstrap(data) | |
dist_of_weights.append(single_bootstrap_weights) | |
dist_of_weights = pd.DataFrame(dist_of_weights) | |
dist_of_weights.columns = data.columns | |
return dist_of_weights | |
## Generate data for the slides | |
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000) | |
plt.hist(weight_distr.SP500, bins=100) | |
plt.hist(weight_distr.US10, bins=100) | |
plt.hist(weight_distr.US5, bins=100) | |
## Now for the plots showing the sensitivity of weights to a given factor, eg Sharpe Ratio | |
## First we need a function that allows us to estimate correlations for a given 'code' eg SP500/US10 | |
def correlation_for_code(data): | |
""" | |
Calculate correlation matrix and return as dict | |
:param data: pd.DataFrame with column names eg xyz... | |
:return: dict with keys eg x/y, y/z, x/z, y/x, z/y, z/x ... containing correlations | |
""" | |
corr = data.corr() | |
instruments = data.columns | |
results_dict = {} | |
for instr1 in instruments: | |
for instr2 in instruments: | |
code = join_corr_code(instr1, instr2) | |
results_dict[code] = corr.loc[instr1, instr2] | |
return results_dict | |
def split_corr_code(code): | |
split_code = code.split("/") | |
instr1 = split_code[0] | |
instr2 = split_code[1] | |
return instr1, instr2 | |
def join_corr_code(instr1, instr2): | |
return '%s/%s' % (instr1, instr2) | |
## Now we have functions that allow us to find the factor distribution for any factor | |
func_dict = dict(sharpe=sharpe_ratio, stdev = annual_stdev_from_weekly_data, | |
corr = correlation_for_code) | |
factor_list = list(func_dict.keys()) | |
def plot_changeling(code, factor,data): | |
""" | |
Top level function which gets the data, plots and analyses it | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:return: None | |
""" | |
assert factor in factor_list | |
# Get the data | |
factor_distribution, weights_data = changeling_graph_data(code, factor, data) | |
# Now make two plots | |
fig, (ax1, ax2) = plt.subplots(2) | |
fig.suptitle("Distribution of %s for %s, average %.2f" % (factor, code, np.mean(factor_distribution))) | |
ax1.hist(factor_distribution, bins=50) | |
weights_data.plot(ax=ax2) | |
# Now analyse | |
analyse_changeling_results(code, factor, factor_distribution, data) | |
def changeling_graph_data(code, factor, data): | |
""" | |
Get the data required for the graphing; a factor distribution and the matched weights | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:return: tuple: factor_distribution and weights | |
""" | |
factor_distribution = get_factor_distribution(code, factor, data) | |
weights_data = get_weights_data(code, factor, data, factor_distribution) | |
return factor_distribution, weights_data | |
def get_factor_distribution(code, factor, data, monte_length=10000): | |
""" | |
Get the bootstrapped distribution of a given factor value | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param monte_length: int, how many monte carlo runs to do | |
:return: list of float | |
""" | |
factor_func = func_dict[factor] | |
factor_distr = [] | |
for _not_used in range(monte_length): | |
bootstrap_data = get_bootstrap_series(data) | |
factor_estimate = factor_func(bootstrap_data) | |
factor_estimate_for_code = factor_estimate[code] | |
factor_distr.append(factor_estimate_for_code) | |
# note we drop the outlying values which are probably extreme | |
factor_distr.sort() | |
drop_values = int(monte_length*.01) | |
factor_distr = factor_distr[drop_values:-drop_values] | |
return factor_distr | |
def get_weights_data(code, factor, data, factor_distribution, points_to_use=100): | |
""" | |
Optimise weights, using the statistics in data, but replacing the estimate of factor for code | |
by various points on the factor distribution. | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_distribution: list of float | |
:param points_to_use: int, how many points on the factor distribution | |
:return: pd.DataFrame of weights, same columns as data, length is points_to_use | |
""" | |
factor_range = [np.min(factor_distribution), np.max(factor_distribution)] | |
factor_step = (factor_range[1] - factor_range[0])/points_to_use | |
factor_values_to_test = np.arange(start = factor_range[0], stop = factor_range[1], step = factor_step) | |
weight_results = [] | |
for factor_value_to_use in factor_values_to_test: | |
weights = optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use) | |
weight_results.append(weights) | |
weight_results = pd.DataFrame(weight_results) | |
weight_results.columns = data.columns | |
return weight_results | |
def optimise_with_replaced_factor_value(data, code, factor, factor_value_to_use): | |
""" | |
Optimise weights, using the statistics in data, but replacing the estimate of factor for code | |
by factor_value_to_use | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_value_to_use: float | |
:return: list of floats, weights | |
""" | |
mean_list, stdev_list, corrmatrix = replace_factor_value(data, code, factor, factor_value_to_use) | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corrmatrix) | |
return weights | |
def replace_factor_value(data, code, factor, factor_value_to_use): | |
""" | |
Estimate statistics from data, but replacing the estimate of factor for code | |
by factor_value_to_use | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param data: pd.DataFrame | |
:param factor_value_to_use: float | |
:return: 3 tuple mean_list, stdev_list, corrmatrix | |
""" | |
# First standard deviation | |
stdev_list = annual_stdev_from_weekly_data(data) | |
if factor=="stdev": | |
stdev_list[code] = factor_value_to_use | |
stdev_list = stdev_list.values | |
# Next Sharpe Ratio | |
sharpe_list = sharpe_ratio(data) | |
if factor=="sharpe": | |
sharpe_list[code] = factor_value_to_use | |
sharpe_list = sharpe_list.values | |
# We don't derive mean directly instead we derive it from Sharpe Ratios | |
mean_list = stdev_list * sharpe_list | |
# Now correlations. Needs a fancy function | |
corrmatrix = data.corr() | |
if factor=="corr": | |
corrmatrix = replace_corr_value(corrmatrix, code, factor_value_to_use) | |
corrmatrix = corrmatrix.values | |
return mean_list, stdev_list, corrmatrix | |
def replace_corr_value(corrmatrix, code, factor_value_to_use): | |
""" | |
Given a correlation matrix, replace one value (actually two because of symettry) | |
:param corrmatrix: NxN matrix with labels | |
:param code: str 'A/B' where A and N are labels of correlation matrix | |
:param factor_value_to_use: float, to replace A/B and B/A correlation with | |
:return: new NxN correlation matrix | |
""" | |
instr1, instr2 = split_corr_code(code) | |
corrmatrix.loc[instr1, instr2] = factor_value_to_use | |
corrmatrix.loc[instr2, instr1] = factor_value_to_use | |
return corrmatrix | |
def analyse_changeling_results(code, factor, factor_distribution, data): | |
""" | |
Prints some analysis of factor_distribution | |
:param code: str. Must be a column name in data or 'A/B' if correlations are used | |
:param factor: str. Must be in factor_list | |
:param factor_distribution: list of float | |
:return: None, just prints | |
""" | |
factor5 = np.percentile(factor_distribution, 5) | |
factor95 = np.percentile(factor_distribution, 95) | |
print("There is a 90%% chance that %s for %s was between %.2f and %.2f" % (factor, code, factor5, factor95)) | |
weights5 = optimise_with_replaced_factor_value(data, code, factor, factor5) | |
weights95 = optimise_with_replaced_factor_value(data, code, factor, factor95) | |
## Get the weights we need for code | |
## For correlations return the weights of the first item in the pair 'A/B' | |
if factor=="corr": | |
code = split_corr_code(code)[0] | |
# Weights are list not dict, so need to know position of right weight | |
instruments = list(data.columns) | |
code_index = instruments.index(code) | |
weight5_code = weights5[code_index] | |
weight95_code = weights95[code_index] | |
print("Giving weights for %s between %.3f and %.3f" % (code, weight5_code, weight95_code)) | |
# Now do the plots in the slides | |
plot_changeling("SP500", "sharpe", data) | |
plot_changeling("US10", "sharpe", data) | |
plot_changeling("US5", "sharpe", data) | |
plot_changeling("SP500", "stdev", data) | |
plot_changeling("US10", "stdev", data) | |
plot_changeling("US5", "stdev", data) | |
plot_changeling("SP500/US10", "corr", data) | |
plot_changeling("SP500/US5", "corr", data) | |
plot_changeling("US5/US10", "corr", data) | |
## Let's do optimisation better! | |
## 1) Bootstrapped weights | |
weight_distr = bootstrap_optimisation_distributions(data, monte_count=1000) | |
weight_distr.mean() | |
## 2) Set some values to be identical | |
def optimise_with_identical_values(data, identical_SR=False, identical_stdev=False, identical_corr=False): | |
if identical_stdev: | |
stdev_list = get_identical_stdev(data) | |
else: | |
stdev_list = annual_stdev_from_weekly_data(data).values | |
if identical_SR: | |
## We want means, but derive them from SR based on the standard deviations we have | |
mean_list = get_means_assuming_identical_SR(data, stdev_list) | |
else: | |
mean_list = annual_returns_from_weekly_data(data).values | |
if identical_corr: | |
corr_matrix = get_identical_corr(data) | |
else: | |
corr_matrix = data.corr().values | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix) | |
return weights | |
def get_identical_corr(data): | |
## Returns a boring correlation matrix whose off diagonal elements are equal to the average correlation | |
instrument_count = len(data.columns) | |
estimated_corr = data.corr().values | |
avg_corr = get_avg_corr(estimated_corr) | |
corrmatrix = boring_corr_matrix(instrument_count, offdiag=avg_corr) | |
return corrmatrix | |
def get_identical_stdev(data): | |
## Returns a vector of standard deviations whose elements are the average standard deviation in the data | |
estimated_stdev = annual_stdev_from_weekly_data(data) | |
instrument_count = len(data.columns) | |
average_stdev = estimated_stdev.mean() | |
stdev_list = [average_stdev]*instrument_count | |
return stdev_list | |
def get_means_assuming_identical_SR(data, using_stdev): | |
## Returns a vector of means assuming equal SR, identical to the average in the data | |
# and using the standard deviations provided | |
average_SR = sharpe_ratio(data).mean() | |
mean_list = [stdev*average_SR for stdev in using_stdev] | |
return mean_list | |
from copy import copy | |
## 3) Bayesian optimisation | |
def optimise_with_shrinkage_parameters(data, SR_shrinkage=0.0, corr_shrinkage=0.0, stdev_shrinkage=0.0): | |
""" | |
Optimise using a mixture of data derived | |
:param data: pd.DataFrame | |
:param SR_shrinkage: float, 0= no shrinkage just use the data, 1=just use the prior (identical, average values) | |
:param corr_shrinkage: float | |
:param stdev_shrinkage: float | |
:return: list of floats, weights | |
""" | |
prior_stdev_list = np.array(get_identical_stdev(data)) | |
estimated_stdev_list = annual_stdev_from_weekly_data(data).values | |
stdev_list = prior_stdev_list*stdev_shrinkage + estimated_stdev_list*(1-stdev_shrinkage) | |
prior_mean_list = np.array(get_means_assuming_identical_SR(data, stdev_list)) | |
estimated_mean_list = annual_returns_from_weekly_data(data).values | |
mean_list = prior_mean_list*SR_shrinkage + estimated_mean_list*(1-SR_shrinkage) | |
prior_corr_matrix = get_identical_corr(data) | |
estimated_corr_matrix = data.corr().values | |
corr_matrix = prior_corr_matrix*corr_shrinkage + estimated_corr_matrix*(1-corr_shrinkage) | |
weights = optimise_with_corr_and_std(mean_list, stdev_list, corr_matrix) | |
return weights | |
## 4 Optimisation through integration | |
def optimised_weights_given_correlation_uncertainty(corr_matrix, data_points, p_step=0.25): | |
dist_points = np.arange(p_step, stop=(1-p_step)+0.000001, step=p_step) | |
list_of_weights = [] | |
for conf1 in dist_points: | |
for conf2 in dist_points: | |
for conf3 in dist_points: | |
conf_intervals = labelledCorrelations(conf1, conf2, conf3) | |
weights = optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points) | |
list_of_weights.append(weights) | |
array_of_weights = np.array(list_of_weights) | |
average_weights = np.nanmean(array_of_weights, axis=0) | |
return average_weights | |
labelledCorrelations = namedtuple("labelledCorrelations", 'ab ac bc') | |
def optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points): | |
labelled_correlations = extract_asset_pairwise_correlations_from_matrix(corr_matrix) | |
labelled_correlation_points = calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points) | |
corr_matrix_at_distribution_point = three_asset_corr_matrix(labelled_correlation_points) | |
weights = optimise_for_corr_matrix(corr_matrix_at_distribution_point) | |
return weights | |
def extract_asset_pairwise_correlations_from_matrix(corr_matrix): | |
ab = corr_matrix[0][1] | |
ac = corr_matrix[0][2] | |
bc = corr_matrix[1][2] | |
return labelledCorrelations(ab=ab, ac=ac, bc=bc) | |
def calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points): | |
correlation_point_list = [get_correlation_distribution_point(corr_value, data_points, confidence_interval) | |
for corr_value, confidence_interval in | |
zip(labelled_correlations, conf_intervals)] | |
labelled_correlation_points = labelledCorrelations(*correlation_point_list) | |
return labelled_correlation_points | |
def get_correlation_distribution_point(corr_value, data_points, conf_interval): | |
fisher_corr = fisher_transform(corr_value) | |
point_in_fisher_units = \ | |
get_fisher_confidence_point(fisher_corr, data_points, conf_interval) | |
point_in_natural_units = inverse_fisher(point_in_fisher_units) | |
return point_in_natural_units | |
def fisher_transform(corr_value): | |
if corr_value>=1.0: | |
corr_value = 0.99999999999999 | |
elif corr_value<=-1.0: | |
corr_value = -0.99999999999999 | |
return 0.5*np.log((1+corr_value) / (1-corr_value)) # also arctanh | |
def get_fisher_confidence_point(fisher_corr, data_points, conf_interval): | |
if conf_interval<0.5: | |
confidence_in_fisher_units = fisher_confidence(data_points, conf_interval) | |
point_in_fisher_units = fisher_corr - confidence_in_fisher_units | |
elif conf_interval>0.5: | |
confidence_in_fisher_units = fisher_confidence(data_points, 1-conf_interval) | |
point_in_fisher_units = fisher_corr + confidence_in_fisher_units | |
else: | |
point_in_fisher_units = fisher_corr | |
return point_in_fisher_units | |
def fisher_confidence(data_points, conf_interval): | |
data_point_root =fisher_stdev(data_points)*FUDGE_FACTOR_FOR_CORR_WEIGHT_UNCERTAINTY | |
conf_point = get_confidence_point(conf_interval) | |
return data_point_root * conf_point | |
def fisher_stdev(data_points): | |
data_point_root = 1/((data_points-3)**.5) | |
return data_point_root | |
def get_confidence_point(conf_interval): | |
conf_point = norm.ppf(1-(conf_interval/2)) | |
return conf_point | |
def inverse_fisher(fisher_corr_value): | |
return (np.exp(2*fisher_corr_value) - 1) / (np.exp(2*fisher_corr_value) + 1) | |
def three_asset_corr_matrix(labelled_correlations): | |
""" | |
:return: np.array 2 dimensions, size | |
""" | |
ab = labelled_correlations.ab | |
ac = labelled_correlations.ac | |
bc = labelled_correlations.bc | |
m = [[1.0, ab, ac], [ab, 1.0, bc], [ac, bc, 1.0]] | |
m = np.array(m) | |
return m | |
def optimise_for_corr_matrix(corr_matrix): | |
## arbitrary | |
mean_list = [.05]*3 | |
std = [.1]*3 | |
return optimise_with_corr_and_std( mean_list,std, corr_matrix) | |
def multiplier_from_relative_SR(relative_SR, avg_correlation, years_of_data): | |
# Return a multiplier | |
# 1 implies no adjustment required | |
ratio = mini_bootstrap_ratio_given_SR_diff( | |
relative_SR, avg_correlation, years_of_data | |
) | |
return ratio | |
def mini_bootstrap_ratio_given_SR_diff( | |
SR_diff, | |
avg_correlation, | |
years_of_data, | |
avg_SR=0.5, | |
std=0.15, | |
how_many_assets=2, | |
p_step=0.01, | |
): | |
""" | |
Do a parametric bootstrap of portfolio weights to tell you what the ratio should be between an asset which | |
has a higher backtested SR (by SR_diff) versus another asset(s) with average Sharpe Ratio (avg_SR) | |
All assets are assumed to have same standard deviation and correlation | |
:param SR_diff: Difference in performance in Sharpe Ratio (SR) units between one asset and the rest | |
:param avg_correlation: Average correlation across portfolio | |
:param years_of_data: How many years of data do you have (can be float for partial years) | |
:param avg_SR: Should be realistic for your type of trading | |
:param std: Standard deviation (doesn't affect results, just a scaling parameter) | |
:param how_many_assets: How many assets in the imaginary portfolio | |
:param p_step: Step size to go through in the CDF of the mean estimate | |
:return: float, ratio of weight of asset with different SR to 1/n weight | |
""" | |
dist_points = np.arange( | |
p_step, | |
stop=( | |
1 - | |
p_step) + | |
0.00000001, | |
step=p_step) | |
list_of_weights = [ | |
weights_given_SR_diff( | |
SR_diff, | |
avg_correlation, | |
confidence_interval, | |
years_of_data, | |
avg_SR=avg_SR, | |
std=std, | |
how_many_assets=how_many_assets, | |
) | |
for confidence_interval in dist_points | |
] | |
array_of_weights = np.array(list_of_weights) | |
average_weights = np.nanmean(array_of_weights, axis=0) | |
ratio_of_weights = weight_ratio(average_weights) | |
if np.sign(ratio_of_weights - 1.0) != np.sign(SR_diff): | |
# This shouldn't happen, and only occurs because weight distributions | |
# get curtailed at zero | |
return 1.0 | |
return ratio_of_weights | |
def weight_ratio(weights): | |
""" | |
Return the ratio of weight of first asset to other weights | |
:param weights: | |
:return: float | |
""" | |
one_over_N_weight = 1.0 / len(weights) | |
weight_first_asset = weights[0] | |
return weight_first_asset / one_over_N_weight | |
def weights_given_SR_diff( | |
SR_diff, | |
avg_correlation, | |
confidence_interval, | |
years_of_data, | |
avg_SR=0.5, | |
std=0.15, | |
how_many_assets=2, | |
): | |
""" | |
Return the ratio of weight to 1/N weight for an asset with unusual SR | |
:param SR_diff: Difference between the SR and the average SR. 0.0 indicates same as average | |
:param avg_correlation: Average correlation amongst assets | |
:param years_of_data: How long has this been going one | |
:param avg_SR: Average SR to use for other asset | |
:param confidence_interval: How confident are we about our mean estimate (i.e. cdf point) | |
:param how_many_assets: .... are we optimising over (I only consider 2, but let's keep it general) | |
:param std: Standard deviation to use | |
:return: Ratio of weight, where 1.0 means no difference | |
""" | |
average_mean = avg_SR * std | |
asset1_mean = (SR_diff + avg_SR) * std | |
mean_difference = asset1_mean - average_mean | |
# Work out what the mean is with appropriate confidence | |
confident_mean_difference = calculate_confident_mean_difference( | |
std, years_of_data, mean_difference, confidence_interval, avg_correlation) | |
confident_asset1_mean = confident_mean_difference + average_mean | |
mean_list = [confident_asset1_mean] + \ | |
[average_mean] * (how_many_assets - 1) | |
weights = optimise_using_correlation(mean_list, avg_correlation, std) | |
return list(weights) | |
def optimise_using_correlation(mean_list, avg_correlation, std): | |
corr_matrix = boring_corr_matrix(len(mean_list), offdiag=avg_correlation) | |
stdev_list = [std] * len(mean_list) | |
sigma = sigma_from_corr_and_std(stdev_list, corr_matrix) | |
return optimise(sigma, mean_list) | |
def calculate_confident_mean_difference( | |
std, years_of_data, mean_difference, confidence_interval, avg_correlation | |
): | |
omega_difference = calculate_omega_difference( | |
std, years_of_data, avg_correlation) | |
confident_mean_difference = stats.norm( | |
mean_difference, omega_difference).ppf(confidence_interval) | |
return confident_mean_difference | |
def calculate_omega_difference(std, years_of_data, avg_correlation): | |
omega_one_asset = std / (years_of_data) ** 0.5 | |
omega_variance_difference = 2 * \ | |
(omega_one_asset ** 2) * (1 - avg_correlation) | |
omega_difference = omega_variance_difference ** 0.5 | |
return omega_difference | |
def norm_weights(list_of_weights): | |
norm_weights = list(np.array(list_of_weights) / np.sum(list_of_weights)) | |
return norm_weights | |
def adjust_weights_for_SR(weights, SR_list, years_of_data, avg_correlation): | |
""" | |
Adjust weights according to heuristic method | |
:param weights: List of float, starting weights | |
:param SR_list: np.array of Sharpe Ratios | |
:param years_of_data: float | |
:return: list of adjusted weights | |
""" | |
assert len(weights) == len(SR_list) | |
avg_SR = np.nanmean(SR_list) | |
relative_SR_list = SR_list - avg_SR | |
multipliers = [ | |
float(multiplier_from_relative_SR(relative_SR, avg_correlation, years_of_data)) | |
for relative_SR in relative_SR_list | |
] | |
new_weights = list(np.array(weights) * np.array(multipliers)) | |
norm_new_weights = norm_weights(new_weights) | |
return norm_new_weights | |
corr_matrix =data.corr().values | |
corr_weights = optimised_weights_given_correlation_uncertainty(corr_matrix, len(data.index), p_step=0.1) | |
avg_correlation = get_avg_corr(corr_matrix) | |
WEEKS_IN_YEAR = 365.25 / 7.0 | |
years_of_data = len(data.index)/WEEKS_IN_YEAR # weekly data | |
SR_list = list(sharpe_ratio(data).values) | |
## these are risk weights | |
adjusted_weights = adjust_weights_for_SR( | |
corr_weights, SR_list, years_of_data, avg_correlation | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment