Created
December 4, 2020 10:13
-
-
Save robcarver17/61fd128d4210a27b20b7358a3efed7f0 to your computer and use it in GitHub Desktop.
Revisions
-
robcarver17 created this gist
Dec 4, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,523 @@ """ The starter system has the following features: - single market - binary forecast from simple MAV - exit from trailing stop loss - fixed positions once in trade """ import matplotlib matplotlib.use("TkAgg") from systems.defaults import system_defaults from syscore.genutils import sign from systems.provided.futures_chapter15.basesystem import * from sysdata.configdata import Config from systems.forecasting import TradingRule from systems.positionsizing import PositionSizing from systems.system_cache import diagnostic, output from copy import copy import numpy as np import pandas as pd from random import getrandbits import matplotlib.pylab as plt def simple_mav(price, short=16, long=64, forecast_fixed=10): """ Simple moving average crossover :param price: :param short: days for short :param long: days for short :return: binary time series """ short_mav = price.rolling(short, min_periods=1).mean() long_mav = price.rolling(long, min_periods=1).mean() signal = short_mav - long_mav binary = signal.apply(sign) binary_position = forecast_fixed * binary return binary_position class simpleSysystemPosition(object): def __init__(self, dynamic_vol=False, dynamic_SL = False): self.current_position = 0.0 self.previous_position = 0.0 self.dynamic_vol = dynamic_vol self.dynamic_SL = dynamic_SL def no_position_check_for_trade(self, original_position_now, current_price, current_vol): assert self.no_current_position if np.isnan(original_position_now): # no signal return 0.0 if original_position_now ==0.0: return 0.0 # potentially going long / short # check last position to avoid whipsaw if self.previous_position != 0.0: # same way round avoid whipsaw if sign( original_position_now) == sign(self.previous_position): return 0.0 self.initialise_trade(original_position_now, current_price, current_vol) return original_position_now @property def no_current_position(self): return self.current_position==0.0 def initialise_trade(self, original_position_now, current_price, current_vol): # okay to do this - we don't want to enter a new position unless sign changed # we set the position at the sized position at moment of # inception self.current_position = original_position_now self.price_list_since_position_held = [current_price] self.initial_vol = current_vol self.initial_position = original_position_now return original_position_now def position_on_check_for_close(self, current_price, current_vol): assert not self.no_current_position # already holding a position # calculate HWM self.update_price_series(current_price) new_position = self.vol_adjusted_position(current_vol) time_to_close_trade =self.check_if_hit_stoploss(current_vol) if time_to_close_trade: self.close_trade() return new_position def update_price_series(self, current_price): price_list_since_position_held = self.price_list_since_position_held price_list_since_position_held.append(current_price) def vol_adjusted_position(self, current_vol): initial_position = self.initial_position if self.dynamic_vol: vol_adjusted_position = (self.initial_vol / current_vol) * initial_position return vol_adjusted_position else: return initial_position def check_if_hit_stoploss(self, current_vol): stoploss_gap = self.stoploss_gap(current_vol) sign_position = sign(self.current_position) if sign_position == 1: # long time_to_close_trade = self.check_if_long_stop_hit(stoploss_gap) else: # short time_to_close_trade = self.check_if_short_stop_hit(stoploss_gap) return time_to_close_trade def stoploss_gap(self, current_vol): xfactor = self.Xfactor if self.dynamic_vol: vol = current_vol else: vol = self.initial_vol stoploss_gap = vol * xfactor return stoploss_gap @property def Xfactor(self): if self.dynamic_SL: return self.dynamic_xfactor() else: return fixed_xfactor() def dynamic_xfactor(self): pandl_vol_units = self.vol_adjusted_profit_since_trade_points() return dynamic_xfactor(pandl_vol_units) def vol_adjusted_profit_since_trade_points(self): if self.no_current_position: return 0.0 initial_vol = self.initial_vol profit_price_units = self.profit_since_trade_points() return profit_price_units / initial_vol def profit_since_trade_points(self): assert not self.no_current_position current_position = self.current_position if current_position>0: return self.current_price - self.initial_price else: return self.initial_price - self.current_price @property def current_price(self): price_list_since_position_held = self.price_list_since_position_held current_price = price_list_since_position_held[-1] return current_price @property def initial_price(self): price_list_since_position_held = self.price_list_since_position_held initial_price = price_list_since_position_held[0] return initial_price def check_if_long_stop_hit(self, stoploss_gap): threshold = self.hwm - stoploss_gap time_to_close_trade = self.current_price < threshold return time_to_close_trade def check_if_short_stop_hit(self, stoploss_gap): threshold = self.hwm + stoploss_gap time_to_close_trade = self.current_price > threshold return time_to_close_trade @property def hwm(self): current_position = self.current_position if current_position > 0: return self.hwm_when_long() else: return self.hwm_when_short() def hwm_when_long(self): price_list_since_position_held = self.price_list_since_position_held hwm = np.nanmax(price_list_since_position_held) return hwm def hwm_when_short(self): price_list_since_position_held = self.price_list_since_position_held hwm = np.nanmin(price_list_since_position_held) return hwm def close_trade(self): self.previous_position = copy(self.current_position) self.current_position = 0.0 self.price_list_since_position_held = [] del(self.initial_vol) del(self.initial_position) def fixed_xfactor(): return 8.0 def dynamic_xfactor(pandl_vol_units): MINIMUM_XFACTOR = 2.0 MAXIMUM_XFACTOR = 8.0 PANDL_UPPER_CUTOFF = 8.0 PANDL_LOWER_CUTOFF = 0.0 if pandl_vol_units<=PANDL_LOWER_CUTOFF: return MINIMUM_XFACTOR elif pandl_vol_units>PANDL_UPPER_CUTOFF: return MAXIMUM_XFACTOR else: return MINIMUM_XFACTOR + (pandl_vol_units)*(MAXIMUM_XFACTOR - MINIMUM_XFACTOR)/(PANDL_UPPER_CUTOFF - PANDL_LOWER_CUTOFF) def stoploss(price, vol, raw_position, dynamic_vol=False, dynamic_SL = False): """ Apply trailing stoploss :param price: :param vol: eg system.rawdata.daily_returns_volatility("SP500") :param raw_position: Raw position series, without stoploss or entry / exit logic :return: New position series """ assert all(vol.index == price.index) assert all(price.index == raw_position.index) # assume all lined up simple_system_position = simpleSysystemPosition( dynamic_vol=dynamic_vol, dynamic_SL=dynamic_SL) new_position_list = [] for iday in range(len(price)): current_price = price[iday] current_vol = vol[iday] if simple_system_position.no_current_position: # no position, check for signal original_position_now = raw_position[iday] new_position = simple_system_position.no_position_check_for_trade(original_position_now, current_price, current_vol) else: new_position = simple_system_position.position_on_check_for_close( current_price, current_vol) new_position_list.append(new_position) new_position_df = pd.Series(new_position_list, raw_position.index) return new_position_df class PositionSizeWithStopLoss(PositionSizing): @diagnostic() def get_subsystem_position_preliminary(self, instrument_code): """ Get scaled position (assuming for now we trade our entire capital for one instrument) """ self.log.msg( "Calculating subsystem position for %s" % instrument_code, instrument_code=instrument_code, ) """ We don't allow this to be changed in config """ avg_abs_forecast = system_defaults["average_absolute_forecast"] vol_scalar = self.get_volatility_scalar(instrument_code) # forecast is binary + or - avg-abs-forecast forecast = self.get_combined_forecast(instrument_code) vol_scalar = vol_scalar.reindex(forecast.index).ffill() # put on a position according to vol and sign of forecast; this will only take effect on a new trade subsystem_position = vol_scalar * forecast / avg_abs_forecast return subsystem_position @output() def get_subsystem_position(self, instrument_code): """ Get scaled position (assuming for now we trade our entire capital for one instrument) """ price = self.parent.rawdata.get_daily_prices(instrument_code) vol = self.parent.rawdata.daily_returns_volatility(instrument_code) raw_position = self.get_subsystem_position_preliminary(instrument_code) subsystem_position = stoploss(price, vol, raw_position, dynamic_vol=self.parent.config.dynamic_vol, dynamic_SL = self.parent.config.dynamic_SL) return subsystem_position simple_mav_rule = TradingRule( dict(function=simple_mav, other_args=dict(long=40, short=10)) ) data = csvFuturesSimData() ## trade by trade p&l ## only works at subsystem level def system_given_flags(dynamic_vol = False, dynamic_SL = False): config = Config( dict( trading_rules=dict(simple_mav=simple_mav_rule), percentage_vol_target=16.0, notional_trading_capital=100000000, dynamic_vol=dynamic_vol, dynamic_SL=dynamic_SL ) ) system = System( [ Account(), Portfolios(), PositionSizeWithStopLoss(), FuturesRawData(), ForecastCombine(), ForecastScaleCap(), Rules(simple_mav_rule), ], data, config, ) system.set_logging_level("on") return system def stats(stacked_returns): stacked_returns_pandl, stacked_trades_pandl = stacked_returns print("SR %f" % sharpe_for_stacked(stacked_returns_pandl)) print("Skew trades %f" % skew_for_stacked(stacked_trades_pandl)) print("Skew daily returns %f" % skew_for_stacked(stacked_returns_pandl)) print("Skew weekly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1W")) print("Skew monthly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1M")) def stacked_returns_over_instrument(system): return_list = [] for instrument in system.get_instrument_list(): returns = calc_returns_for_system_and_code(instrument, system) return_list.append(returns) stacked_returns_pandl = [x[0] for x in return_list] stacked_trades_pandl = [x[1] for x in return_list] return stacked_returns_pandl, stacked_trades_pandl def stack_to_df(stacked_returns, period=None): if period is not None: new_stacked_returns = [returns.resample(period).sum() for returns in stacked_returns] else: new_stacked_returns = stacked_returns df_returns = pd.concat(new_stacked_returns, axis=0) return df_returns def calc_returns_for_system_and_code(instrument_code: str, system: System): pandl_returns_capital = pandl_capital(instrument_code, system, method_used="returns") pandl_trades_capital = pandl_capital(instrument_code, system, method_used="trades") return pandl_returns_capital, pandl_trades_capital def sharpe_for_stacked(stacked_returns_pandl): sharpe_list = [sharpe(pandl_series_capital) for pandl_series_capital in stacked_returns_pandl] return np.median(sharpe_list) def sharpe(pandl_series_capital): avg = pandl_series_capital.mean() stdev = pandl_series_capital.std() return 16* avg / stdev def skew_for_stacked(stacked_returns_pandl, period="1B"): resampled = [pandl_series_capital.resample(period) for pandl_series_capital in stacked_returns_pandl] skew_list = [pandl_series_capital.skew() for pandl_series_capital in resampled] return np.median(skew_list) def pandl_capital(instrument_code: str, system: System, method_used: str = "returns"): pandl_series_money = pandl_money(instrument_code, system, method_used=method_used) capital = system.config.notional_trading_capital return pandl_series_money / capital def pandl_money(instrument_code: str, system: System, method_used: str= "returns"): pos_series = system.positionSize.get_subsystem_position(instrument_code) price_series = system.rawdata.get_daily_prices(instrument_code) block_size =system.data.get_value_of_block_price_move(instrument_code) fx =system.positionSize.get_fx_rate(instrument_code) ans = pandl_base(price_series, pos_series, block_size, fx, method_used = method_used) return ans def pandl_base(price_series: pd.Series, pos_series: pd.DataFrame, block_size: float, fx: pd.Series, method_used = "returns"): pandl_series_local = pandl_local_ccy(price_series, pos_series, block_size, method_used = method_used) fx_matching = fx.reindex(pandl_series_local.index).ffill() return pandl_series_local * fx_matching def pandl_local_ccy(price_series, pos_series, block_size, method_used = "returns"): if method_used == "returns": pandl_series_points =pandl_returns_points(price_series, pos_series) else: ## trades pandl_series_points = pandl_trades_points(price_series, pos_series) return pandl_series_points * block_size def pandl_returns_points(price_series, pos_series): """ Calculate pandl for an individual position :param price: price series :type price: Tx1 pd.Series :param trade_series: set of trades done NOT always aligned to price can be length 0 :type trade_series: Tx2 pd.DataFrame columns ['qty', 'price'] :param pos_series: series of positions NOT ALWAYS aligned to price :type pos_series: Tx1 pd.Series :returns: pd.Series """ # want to have both kinds of price price_series = price_series.reindex(pos_series.index, method="ffill") price_returns = price_series.diff() returns = pos_series.shift(1) * price_returns return returns def pandl_trades_points(price_series, pos_series): """ :returns: pd.Series, only dates when we're trading """ # want to have both kinds of price returns = pandl_returns_points(price_series, pos_series) trade_dates_idx = get_trade_dates_idx_from_pos_series(pos_series) trade_pandl = [sum_between_trade_dates(trade_dates_idx, idx, returns) for idx in range(len(trade_dates_idx))] trade_returns = pd.Series(trade_pandl, index = returns.index[trade_dates_idx]) return trade_returns def get_trade_dates_idx_from_pos_series(pos_series): prev_position = pos_series.shift(1) trade_dates_idx = [idx for idx, _index_date in enumerate(list(pos_series.index)) if traded(pos_series, prev_position, idx)] if trade_dates_idx[-1]<len(pos_series)-1: trade_dates_idx.append(len(pos_series)-1) return trade_dates_idx def traded(pos_series, prev_position, idx): if sign(pos_series[idx]) != sign(prev_position[idx]): return True else: return False def sum_between_trade_dates(trade_dates_idx, idx, returns): if idx==0: previous_date_idx = 0 else: previous_date_idx = trade_dates_idx[idx-1] current_idx = trade_dates_idx[idx] return returns[previous_date_idx: current_idx].sum() system = system_given_flags(dynamic_vol=True, dynamic_SL=True) stacked_returns = stacked_returns_over_instrument(system) stats(stacked_returns)