Created
November 3, 2020 15:07
-
-
Save robcarver17/a02b427ab80f3b0be6e4d2925d3791fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Following code is boilerplate for optimising | |
import matplotlib | |
matplotlib.use("TkAgg") | |
import pandas as pd | |
from scipy.optimize import minimize | |
import numpy as np | |
from scipy.stats import norm | |
from collections import namedtuple | |
def optimise_for_corr_matrix(corr_matrix): | |
mean_list = [.05]*3 | |
std = [.1]*3 | |
return optimise_inner(mean_list, corr_matrix, std) | |
def optimise_inner(mean_list, corr_matrix, std): | |
stdev_list = [std]*len(mean_list) | |
sigma = sigma_from_corr_and_std(stdev_list, corr_matrix) | |
return optimise_with_sigma(sigma, mean_list) | |
def optimise_with_sigma(sigma, mean_list): | |
mus = np.array(mean_list, ndmin=2).transpose() | |
number_assets = sigma.shape[1] | |
start_weights = [1.0 / number_assets] * number_assets | |
# Constraints - positive weights, adding to 1.0 | |
bounds = [(0.0, 1.0)] * number_assets | |
cdict = [{'type': 'eq', 'fun': addem}] | |
ans = minimize( | |
neg_SR, | |
start_weights, (sigma, mus), | |
method='SLSQP', | |
bounds=bounds, | |
constraints=cdict, | |
tol=0.00001) | |
weights = ans['x'] | |
return weights | |
def neg_SR(weights, sigma, mus): | |
# Returns minus the Sharpe Ratio (as we're minimising) | |
weights = np.matrix(weights) | |
estreturn = (weights * mus)[0, 0] | |
std_dev = (variance(weights, sigma)**.5) | |
return -estreturn / std_dev | |
def variance(weights, sigma): | |
# returns the variance (NOT standard deviation) given weights and sigma | |
return (weights * sigma * weights.transpose())[0, 0] | |
def sigma_from_corr_and_std(stdev_list, corrmatrix): | |
stdev = np.array(stdev_list, ndmin=2).transpose() | |
sigma = stdev * corrmatrix * stdev | |
return sigma | |
def addem(weights): | |
# Used for constraints | |
return 1.0 - sum(weights) | |
labelledCorrelations = namedtuple("labelledCorrelations", 'ab ac bc') | |
def three_asset_corr_matrix(labelled_correlations): | |
""" | |
:return: np.array 2 dimensions, size | |
""" | |
ab = labelled_correlations.ab | |
ac = labelled_correlations.ac | |
bc = labelled_correlations.bc | |
m = [[1.0, ab, ac], [ab, 1.0, bc], [ac, bc, 1.0]] | |
m = np.array(m) | |
return m | |
def get_correlation_distribution_point(corr_value, data_points, conf_interval): | |
fisher_corr = fisher_transform(corr_value) | |
point_in_fisher_units = \ | |
get_fisher_confidence_point(fisher_corr, data_points, conf_interval) | |
point_in_natural_units = inverse_fisher(point_in_fisher_units) | |
return point_in_natural_units | |
def fisher_transform(corr_value): | |
if corr_value>=1.0: | |
corr_value = 0.99999999999999 | |
elif corr_value<=-1.0: | |
corr_value = -0.99999999999999 | |
return 0.5*np.log((1+corr_value) / (1-corr_value)) # also arctanh | |
def get_fisher_confidence_point(fisher_corr, data_points, conf_interval): | |
if conf_interval<0.5: | |
confidence_in_fisher_units = fisher_confidence(data_points, conf_interval) | |
point_in_fisher_units = fisher_corr - confidence_in_fisher_units | |
elif conf_interval>0.5: | |
confidence_in_fisher_units = fisher_confidence(data_points, 1-conf_interval) | |
point_in_fisher_units = fisher_corr + confidence_in_fisher_units | |
else: | |
point_in_fisher_units = fisher_corr | |
return point_in_fisher_units | |
def fisher_confidence(data_points, conf_interval): | |
data_point_root =fisher_stdev(data_points)*4 | |
conf_point = get_confidence_point(conf_interval) | |
return data_point_root * conf_point | |
def fisher_stdev(data_points): | |
data_point_root = 1/((data_points-3)**.5) | |
return data_point_root | |
def get_confidence_point(conf_interval): | |
conf_point = norm.ppf(1-(conf_interval/2)) | |
return conf_point | |
def inverse_fisher(fisher_corr_value): | |
return (np.exp(2*fisher_corr_value) - 1) / (np.exp(2*fisher_corr_value) + 1) | |
def optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points): | |
labelled_correlations = extract_asset_pairwise_correlations_from_matrix(corr_matrix) | |
labelled_correlation_points = calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points) | |
corr_matrix = three_asset_corr_matrix(labelled_correlation_points) | |
weights = optimise_for_corr_matrix(corr_matrix) | |
return weights | |
def extract_asset_pairwise_correlations_from_matrix(corr_matrix): | |
ab = corr_matrix[0][1] | |
ac = corr_matrix[0][2] | |
bc = corr_matrix[1][2] | |
return labelledCorrelations(ab=ab, ac=ac, bc=bc) | |
def calculate_correlation_points_from_tuples(labelled_correlations, conf_intervals, data_points): | |
correlation_point_list = [get_correlation_distribution_point(corr_value, data_points, confidence_interval) | |
for corr_value, confidence_interval in | |
zip(labelled_correlations, conf_intervals)] | |
labelled_correlation_points = labelledCorrelations(*correlation_point_list) | |
return labelled_correlation_points | |
def optimised_weights_given_correlation_uncertainty(corr_matrix, data_points, p_step=0.1): | |
dist_points = np.arange(p_step, stop=(1-p_step)+0.000001, step=p_step) | |
list_of_weights = [] | |
for conf1 in dist_points: | |
for conf2 in dist_points: | |
for conf3 in dist_points: | |
conf_intervals = labelledCorrelations(conf1, conf2, conf3) | |
weights = optimise_for_corr_matrix_with_uncertainty(corr_matrix, conf_intervals, data_points) | |
list_of_weights.append(weights) | |
array_of_weights = np.array(list_of_weights) | |
average_weights = np.nanmean(array_of_weights, axis=0) | |
return average_weights | |
def apply_min_weight(average_weights): | |
weights_with_min = [min_weight(weight) for weight in average_weights] | |
adj_weights = weights_sum_to_total(weights_with_min) | |
return adj_weights | |
def min_weight(weight): | |
if weight<0.1: | |
return 0.1 | |
else: | |
return weight | |
def weights_sum_to_total(weights_with_min): | |
sum_weights = np.sum(weights_with_min) | |
adj_weights = weights_with_min / sum_weights | |
return adj_weights | |
# get some data | |
from systems.provided.futures_chapter15.basesystem import futures_system | |
system = futures_system() | |
## underlying returns | |
#instrument_list = ["US10", "SP500", "US5"] # sub portfolio | |
config = system.config | |
del(config.instrument_weights) | |
system = futures_system(config=config) | |
instrument_list = system.get_instrument_list() | |
perc_returns = [system.rawdata.get_percentage_returns(instrument_code) for instrument_code in instrument_list] | |
perc_returns_df = pd.concat(perc_returns, axis=1) | |
perc_returns_df.columns = instrument_list | |
## subsystem returns | |
subsys_returns = [system.accounts.pandl_for_subsystem(instrument_code) for instrument_code in instrument_list] | |
subsys_returns_df = pd.concat(subsys_returns, axis=1) | |
subsys_returns_df.columns = instrument_list | |
## trading rule returns | |
def get_rule_returns(instrument_code): | |
rules = list(system.rules.trading_rules().keys()) | |
rule_returns = [system.accounts.pandl_for_instrument_forecast(instrument_code, rule) for rule in rules] | |
rule_returns_df = pd.concat(rule_returns, axis=1) | |
rule_returns_df.columns = rules | |
return rule_returns_df | |
def get_forecast_and_future_corr(Nweeks_back, Nweeks_forward): | |
forecast = get_historic_correlations(Nweeks_back) | |
future = get_future_correlations(Nweeks_forward) # for fixed period | |
pd_result = merge_forecast_and_future(forecast, future, Nweeks_forward) | |
return pd_result | |
def merge_forecast_and_future(forecast, future, Nweeks_forward): | |
assets = forecast.columns # should be the same won't check | |
pd_result = [] | |
for asset in assets: | |
result_for_asset = pd.concat([forecast[asset], future[asset]], axis=1) | |
# remove tail with nothing | |
result_for_asset = result_for_asset[:-Nweeks_forward] | |
# remove overlapping periods which bias R^2 | |
selector = range(0, len(result_for_asset.index), Nweeks_forward) | |
result_for_asset = result_for_asset.iloc[selector] | |
result_for_asset.columns = ['forecast', 'turnout'] | |
pd_result.append(result_for_asset) | |
pd_result = pd.concat(pd_result, axis=0) | |
return pd_result | |
def get_future_correlations(Nweeks_forward): | |
corr = get_rolling_correlations(use_returns, Nweeks_forward) | |
corr = corr.ffill() | |
future_corr = corr.shift(-Nweeks_forward) | |
return future_corr | |
def get_historic_correlations(Nweeks_back): | |
corr = get_rolling_correlations(use_returns, Nweeks_back) | |
corr = corr.ffill() | |
return corr | |
def get_rolling_correlations(use_returns, Nperiods): | |
roll_df = use_returns.rolling(Nperiods, min_periods=4).corr() | |
perm_names = get_asset_perm_names(use_returns) | |
roll_list = [get_rolling_corr_for_perm_pair(perm_pair, roll_df) for perm_pair in perm_names] | |
roll_list_df = pd.concat(roll_list, axis=1) | |
roll_list_df.columns = ["%s/%s" % (asset1, asset2) for (asset1, asset2) in perm_names] | |
return roll_list_df | |
def get_asset_perm_names(use_returns): | |
asset_names = use_returns.columns | |
permlist = [] | |
for asset1 in asset_names: | |
for asset2 in asset_names: | |
if asset1==asset2: | |
continue | |
pairing = [asset1, asset2] | |
if pairing in permlist: | |
continue | |
pairing.reverse() | |
if pairing in permlist: | |
continue | |
permlist.append(pairing) | |
return permlist | |
def get_rolling_corr_for_perm_pair(perm_pair, roll_df): | |
return roll_df[perm_pair[0]][:,perm_pair[1]] | |
use_returns = subsys_returns_df | |
use_returns = use_returns[pd.datetime(1998,1,1):] # common timestamp | |
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns | |
import matplotlib.pyplot as pyplot | |
pyplot.rcParams.update({'font.size': 16}) | |
#get_rolling_correlations(use_returns, 52).plot() | |
Nweeks_list = [4, 7, 13, 26,52, 104, 208, 520] | |
import statsmodels.api as sm | |
r_squared = [] | |
for Nweeks_back in Nweeks_list: | |
print(Nweeks_back) | |
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward) | |
pd_result = pd_result.replace([np.inf, -np.inf], np.nan) | |
pd_result = pd_result.dropna() | |
x = pd_result.forecast | |
x = sm.add_constant(x) | |
y = pd_result.turnout | |
est = sm.OLS(y, x).fit() | |
r2 = est.rsquared_adj | |
r_squared.append(r2) | |
ax = pyplot.gca() | |
ax.scatter(Nweeks_list, r_squared) | |
ax.set_xscale('log') | |
## for trading rules a bit more complex | |
Nweeks_forward = 52 # use 13 weeks for underyling returns, 52 for others | |
r_squared = [] | |
for Nweeks_back in Nweeks_list: | |
print(Nweeks_back) | |
all_r2 = [] | |
for instrument_code in instrument_list: | |
print("%s/%s" % (instrument_code, Nweeks_back)) | |
use_returns = get_rule_returns(instrument_code) | |
use_returns = use_returns[pd.datetime(1998, 1, 1):] # common timestamp | |
use_returns = use_returns.resample("5B").sum() # good enough approx for weekly returns | |
pd_result = get_forecast_and_future_corr(Nweeks_back, Nweeks_forward) | |
pd_result = pd_result.replace([np.inf, -np.inf], np.nan) | |
pd_result = pd_result.dropna() | |
x = pd_result.forecast | |
x = sm.add_constant(x) | |
y = pd_result.turnout | |
est = sm.OLS(y, x).fit() | |
r2_this_instr = est.rsquared_adj | |
all_r2.append(r2_this_instr) | |
r2 = np.mean(all_r2) | |
r_squared.append(r2) | |
ax = pyplot.gca() | |
ax.scatter(Nweeks_list, r_squared) | |
ax.set_xscale('log') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment