Created
December 4, 2020 10:13
-
-
Save robcarver17/61fd128d4210a27b20b7358a3efed7f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The starter system has the following features: | |
- single market | |
- binary forecast from simple MAV | |
- exit from trailing stop loss | |
- fixed positions once in trade | |
""" | |
import matplotlib | |
matplotlib.use("TkAgg") | |
from systems.defaults import system_defaults | |
from syscore.genutils import sign | |
from systems.provided.futures_chapter15.basesystem import * | |
from sysdata.configdata import Config | |
from systems.forecasting import TradingRule | |
from systems.positionsizing import PositionSizing | |
from systems.system_cache import diagnostic, output | |
from copy import copy | |
import numpy as np | |
import pandas as pd | |
from random import getrandbits | |
import matplotlib.pylab as plt | |
def simple_mav(price, short=16, long=64, forecast_fixed=10): | |
""" | |
Simple moving average crossover | |
:param price: | |
:param short: days for short | |
:param long: days for short | |
:return: binary time series | |
""" | |
short_mav = price.rolling(short, min_periods=1).mean() | |
long_mav = price.rolling(long, min_periods=1).mean() | |
signal = short_mav - long_mav | |
binary = signal.apply(sign) | |
binary_position = forecast_fixed * binary | |
return binary_position | |
class simpleSysystemPosition(object): | |
def __init__(self, dynamic_vol=False, dynamic_SL = False): | |
self.current_position = 0.0 | |
self.previous_position = 0.0 | |
self.dynamic_vol = dynamic_vol | |
self.dynamic_SL = dynamic_SL | |
def no_position_check_for_trade(self, original_position_now, current_price, current_vol): | |
assert self.no_current_position | |
if np.isnan(original_position_now): | |
# no signal | |
return 0.0 | |
if original_position_now ==0.0: | |
return 0.0 | |
# potentially going long / short | |
# check last position to avoid whipsaw | |
if self.previous_position != 0.0: | |
# same way round avoid whipsaw | |
if sign( | |
original_position_now) == sign(self.previous_position): | |
return 0.0 | |
self.initialise_trade(original_position_now, current_price, current_vol) | |
return original_position_now | |
@property | |
def no_current_position(self): | |
return self.current_position==0.0 | |
def initialise_trade(self, original_position_now, current_price, current_vol): | |
# okay to do this - we don't want to enter a new position unless sign changed | |
# we set the position at the sized position at moment of | |
# inception | |
self.current_position = original_position_now | |
self.price_list_since_position_held = [current_price] | |
self.initial_vol = current_vol | |
self.initial_position = original_position_now | |
return original_position_now | |
def position_on_check_for_close(self, current_price, current_vol): | |
assert not self.no_current_position | |
# already holding a position | |
# calculate HWM | |
self.update_price_series(current_price) | |
new_position = self.vol_adjusted_position(current_vol) | |
time_to_close_trade =self.check_if_hit_stoploss(current_vol) | |
if time_to_close_trade: | |
self.close_trade() | |
return new_position | |
def update_price_series(self, current_price): | |
price_list_since_position_held = self.price_list_since_position_held | |
price_list_since_position_held.append(current_price) | |
def vol_adjusted_position(self, current_vol): | |
initial_position = self.initial_position | |
if self.dynamic_vol: | |
vol_adjusted_position = (self.initial_vol / current_vol) * initial_position | |
return vol_adjusted_position | |
else: | |
return initial_position | |
def check_if_hit_stoploss(self, current_vol): | |
stoploss_gap = self.stoploss_gap(current_vol) | |
sign_position = sign(self.current_position) | |
if sign_position == 1: | |
# long | |
time_to_close_trade = self.check_if_long_stop_hit(stoploss_gap) | |
else: | |
# short | |
time_to_close_trade = self.check_if_short_stop_hit(stoploss_gap) | |
return time_to_close_trade | |
def stoploss_gap(self, current_vol): | |
xfactor = self.Xfactor | |
if self.dynamic_vol: | |
vol = current_vol | |
else: | |
vol = self.initial_vol | |
stoploss_gap = vol * xfactor | |
return stoploss_gap | |
@property | |
def Xfactor(self): | |
if self.dynamic_SL: | |
return self.dynamic_xfactor() | |
else: | |
return fixed_xfactor() | |
def dynamic_xfactor(self): | |
pandl_vol_units = self.vol_adjusted_profit_since_trade_points() | |
return dynamic_xfactor(pandl_vol_units) | |
def vol_adjusted_profit_since_trade_points(self): | |
if self.no_current_position: | |
return 0.0 | |
initial_vol = self.initial_vol | |
profit_price_units = self.profit_since_trade_points() | |
return profit_price_units / initial_vol | |
def profit_since_trade_points(self): | |
assert not self.no_current_position | |
current_position = self.current_position | |
if current_position>0: | |
return self.current_price - self.initial_price | |
else: | |
return self.initial_price - self.current_price | |
@property | |
def current_price(self): | |
price_list_since_position_held = self.price_list_since_position_held | |
current_price = price_list_since_position_held[-1] | |
return current_price | |
@property | |
def initial_price(self): | |
price_list_since_position_held = self.price_list_since_position_held | |
initial_price = price_list_since_position_held[0] | |
return initial_price | |
def check_if_long_stop_hit(self, stoploss_gap): | |
threshold = self.hwm - stoploss_gap | |
time_to_close_trade = self.current_price < threshold | |
return time_to_close_trade | |
def check_if_short_stop_hit(self, stoploss_gap): | |
threshold = self.hwm + stoploss_gap | |
time_to_close_trade = self.current_price > threshold | |
return time_to_close_trade | |
@property | |
def hwm(self): | |
current_position = self.current_position | |
if current_position > 0: | |
return self.hwm_when_long() | |
else: | |
return self.hwm_when_short() | |
def hwm_when_long(self): | |
price_list_since_position_held = self.price_list_since_position_held | |
hwm = np.nanmax(price_list_since_position_held) | |
return hwm | |
def hwm_when_short(self): | |
price_list_since_position_held = self.price_list_since_position_held | |
hwm = np.nanmin(price_list_since_position_held) | |
return hwm | |
def close_trade(self): | |
self.previous_position = copy(self.current_position) | |
self.current_position = 0.0 | |
self.price_list_since_position_held = [] | |
del(self.initial_vol) | |
del(self.initial_position) | |
def fixed_xfactor(): | |
return 8.0 | |
def dynamic_xfactor(pandl_vol_units): | |
MINIMUM_XFACTOR = 2.0 | |
MAXIMUM_XFACTOR = 8.0 | |
PANDL_UPPER_CUTOFF = 8.0 | |
PANDL_LOWER_CUTOFF = 0.0 | |
if pandl_vol_units<=PANDL_LOWER_CUTOFF: | |
return MINIMUM_XFACTOR | |
elif pandl_vol_units>PANDL_UPPER_CUTOFF: | |
return MAXIMUM_XFACTOR | |
else: | |
return MINIMUM_XFACTOR + (pandl_vol_units)*(MAXIMUM_XFACTOR - MINIMUM_XFACTOR)/(PANDL_UPPER_CUTOFF - PANDL_LOWER_CUTOFF) | |
def stoploss(price, vol, raw_position, dynamic_vol=False, dynamic_SL = False): | |
""" | |
Apply trailing stoploss | |
:param price: | |
:param vol: eg system.rawdata.daily_returns_volatility("SP500") | |
:param raw_position: Raw position series, without stoploss or entry / exit logic | |
:return: New position series | |
""" | |
assert all(vol.index == price.index) | |
assert all(price.index == raw_position.index) | |
# assume all lined up | |
simple_system_position = simpleSysystemPosition( | |
dynamic_vol=dynamic_vol, | |
dynamic_SL=dynamic_SL) | |
new_position_list = [] | |
for iday in range(len(price)): | |
current_price = price[iday] | |
current_vol = vol[iday] | |
if simple_system_position.no_current_position: | |
# no position, check for signal | |
original_position_now = raw_position[iday] | |
new_position = simple_system_position.no_position_check_for_trade(original_position_now, | |
current_price, current_vol) | |
else: | |
new_position = simple_system_position.position_on_check_for_close( | |
current_price, current_vol) | |
new_position_list.append(new_position) | |
new_position_df = pd.Series(new_position_list, raw_position.index) | |
return new_position_df | |
class PositionSizeWithStopLoss(PositionSizing): | |
@diagnostic() | |
def get_subsystem_position_preliminary(self, instrument_code): | |
""" | |
Get scaled position (assuming for now we trade our entire capital for one instrument) | |
""" | |
self.log.msg( | |
"Calculating subsystem position for %s" % instrument_code, | |
instrument_code=instrument_code, | |
) | |
""" | |
We don't allow this to be changed in config | |
""" | |
avg_abs_forecast = system_defaults["average_absolute_forecast"] | |
vol_scalar = self.get_volatility_scalar(instrument_code) | |
# forecast is binary + or - avg-abs-forecast | |
forecast = self.get_combined_forecast(instrument_code) | |
vol_scalar = vol_scalar.reindex(forecast.index).ffill() | |
# put on a position according to vol and sign of forecast; this will only take effect on a new trade | |
subsystem_position = vol_scalar * forecast / avg_abs_forecast | |
return subsystem_position | |
@output() | |
def get_subsystem_position(self, instrument_code): | |
""" | |
Get scaled position (assuming for now we trade our entire capital for one instrument) | |
""" | |
price = self.parent.rawdata.get_daily_prices(instrument_code) | |
vol = self.parent.rawdata.daily_returns_volatility(instrument_code) | |
raw_position = self.get_subsystem_position_preliminary(instrument_code) | |
subsystem_position = stoploss(price, vol, raw_position, dynamic_vol=self.parent.config.dynamic_vol, | |
dynamic_SL = self.parent.config.dynamic_SL) | |
return subsystem_position | |
simple_mav_rule = TradingRule( | |
dict(function=simple_mav, other_args=dict(long=40, short=10)) | |
) | |
data = csvFuturesSimData() | |
## trade by trade p&l | |
## only works at subsystem level | |
def system_given_flags(dynamic_vol = False, dynamic_SL = False): | |
config = Config( | |
dict( | |
trading_rules=dict(simple_mav=simple_mav_rule), | |
percentage_vol_target=16.0, | |
notional_trading_capital=100000000, | |
dynamic_vol=dynamic_vol, | |
dynamic_SL=dynamic_SL | |
) | |
) | |
system = System( | |
[ | |
Account(), | |
Portfolios(), | |
PositionSizeWithStopLoss(), | |
FuturesRawData(), | |
ForecastCombine(), | |
ForecastScaleCap(), | |
Rules(simple_mav_rule), | |
], | |
data, | |
config, | |
) | |
system.set_logging_level("on") | |
return system | |
def stats(stacked_returns): | |
stacked_returns_pandl, stacked_trades_pandl = stacked_returns | |
print("SR %f" % sharpe_for_stacked(stacked_returns_pandl)) | |
print("Skew trades %f" % skew_for_stacked(stacked_trades_pandl)) | |
print("Skew daily returns %f" % skew_for_stacked(stacked_returns_pandl)) | |
print("Skew weekly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1W")) | |
print("Skew monthly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1M")) | |
def stacked_returns_over_instrument(system): | |
return_list = [] | |
for instrument in system.get_instrument_list(): | |
returns = calc_returns_for_system_and_code(instrument, system) | |
return_list.append(returns) | |
stacked_returns_pandl = [x[0] for x in return_list] | |
stacked_trades_pandl = [x[1] for x in return_list] | |
return stacked_returns_pandl, stacked_trades_pandl | |
def stack_to_df(stacked_returns, period=None): | |
if period is not None: | |
new_stacked_returns = [returns.resample(period).sum() for returns in stacked_returns] | |
else: | |
new_stacked_returns = stacked_returns | |
df_returns = pd.concat(new_stacked_returns, axis=0) | |
return df_returns | |
def calc_returns_for_system_and_code(instrument_code: str, system: System): | |
pandl_returns_capital = pandl_capital(instrument_code, system, method_used="returns") | |
pandl_trades_capital = pandl_capital(instrument_code, system, method_used="trades") | |
return pandl_returns_capital, pandl_trades_capital | |
def sharpe_for_stacked(stacked_returns_pandl): | |
sharpe_list = [sharpe(pandl_series_capital) for pandl_series_capital in stacked_returns_pandl] | |
return np.median(sharpe_list) | |
def sharpe(pandl_series_capital): | |
avg = pandl_series_capital.mean() | |
stdev = pandl_series_capital.std() | |
return 16* avg / stdev | |
def skew_for_stacked(stacked_returns_pandl, period="1B"): | |
resampled = [pandl_series_capital.resample(period) for pandl_series_capital in stacked_returns_pandl] | |
skew_list = [pandl_series_capital.skew() for pandl_series_capital in resampled] | |
return np.median(skew_list) | |
def pandl_capital(instrument_code: str, system: System, method_used: str = "returns"): | |
pandl_series_money = pandl_money(instrument_code, system, method_used=method_used) | |
capital = system.config.notional_trading_capital | |
return pandl_series_money / capital | |
def pandl_money(instrument_code: str, system: System, method_used: str= "returns"): | |
pos_series = system.positionSize.get_subsystem_position(instrument_code) | |
price_series = system.rawdata.get_daily_prices(instrument_code) | |
block_size =system.data.get_value_of_block_price_move(instrument_code) | |
fx =system.positionSize.get_fx_rate(instrument_code) | |
ans = pandl_base(price_series, pos_series, block_size, fx, method_used = method_used) | |
return ans | |
def pandl_base(price_series: pd.Series, pos_series: pd.DataFrame, block_size: float, fx: pd.Series, | |
method_used = "returns"): | |
pandl_series_local = pandl_local_ccy(price_series, pos_series, block_size, method_used = method_used) | |
fx_matching = fx.reindex(pandl_series_local.index).ffill() | |
return pandl_series_local * fx_matching | |
def pandl_local_ccy(price_series, pos_series, block_size, method_used = "returns"): | |
if method_used == "returns": | |
pandl_series_points =pandl_returns_points(price_series, pos_series) | |
else: | |
## trades | |
pandl_series_points = pandl_trades_points(price_series, pos_series) | |
return pandl_series_points * block_size | |
def pandl_returns_points(price_series, pos_series): | |
""" | |
Calculate pandl for an individual position | |
:param price: price series | |
:type price: Tx1 pd.Series | |
:param trade_series: set of trades done NOT always aligned to price can be length 0 | |
:type trade_series: Tx2 pd.DataFrame columns ['qty', 'price'] | |
:param pos_series: series of positions NOT ALWAYS aligned to price | |
:type pos_series: Tx1 pd.Series | |
:returns: pd.Series | |
""" | |
# want to have both kinds of price | |
price_series = price_series.reindex(pos_series.index, method="ffill") | |
price_returns = price_series.diff() | |
returns = pos_series.shift(1) * price_returns | |
return returns | |
def pandl_trades_points(price_series, pos_series): | |
""" | |
:returns: pd.Series, only dates when we're trading | |
""" | |
# want to have both kinds of price | |
returns = pandl_returns_points(price_series, pos_series) | |
trade_dates_idx = get_trade_dates_idx_from_pos_series(pos_series) | |
trade_pandl = [sum_between_trade_dates(trade_dates_idx, idx, returns) for idx in range(len(trade_dates_idx))] | |
trade_returns = pd.Series(trade_pandl, index = returns.index[trade_dates_idx]) | |
return trade_returns | |
def get_trade_dates_idx_from_pos_series(pos_series): | |
prev_position = pos_series.shift(1) | |
trade_dates_idx = [idx for idx, _index_date in enumerate(list(pos_series.index)) | |
if traded(pos_series, prev_position, idx)] | |
if trade_dates_idx[-1]<len(pos_series)-1: | |
trade_dates_idx.append(len(pos_series)-1) | |
return trade_dates_idx | |
def traded(pos_series, prev_position, idx): | |
if sign(pos_series[idx]) != sign(prev_position[idx]): | |
return True | |
else: | |
return False | |
def sum_between_trade_dates(trade_dates_idx, idx, returns): | |
if idx==0: | |
previous_date_idx = 0 | |
else: | |
previous_date_idx = trade_dates_idx[idx-1] | |
current_idx = trade_dates_idx[idx] | |
return returns[previous_date_idx: current_idx].sum() | |
system = system_given_flags(dynamic_vol=True, dynamic_SL=True) | |
stacked_returns = stacked_returns_over_instrument(system) | |
stats(stacked_returns) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment