Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Created December 4, 2020 10:13
Show Gist options
  • Save robcarver17/61fd128d4210a27b20b7358a3efed7f0 to your computer and use it in GitHub Desktop.
Save robcarver17/61fd128d4210a27b20b7358a3efed7f0 to your computer and use it in GitHub Desktop.

Revisions

  1. robcarver17 created this gist Dec 4, 2020.
    523 changes: 523 additions & 0 deletions dynamic.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,523 @@
    """
    The starter system has the following features:
    - single market
    - binary forecast from simple MAV
    - exit from trailing stop loss
    - fixed positions once in trade
    """
    import matplotlib
    matplotlib.use("TkAgg")

    from systems.defaults import system_defaults
    from syscore.genutils import sign
    from systems.provided.futures_chapter15.basesystem import *
    from sysdata.configdata import Config
    from systems.forecasting import TradingRule
    from systems.positionsizing import PositionSizing
    from systems.system_cache import diagnostic, output


    from copy import copy
    import numpy as np
    import pandas as pd
    from random import getrandbits
    import matplotlib.pylab as plt



    def simple_mav(price, short=16, long=64, forecast_fixed=10):
    """
    Simple moving average crossover
    :param price:
    :param short: days for short
    :param long: days for short
    :return: binary time series
    """

    short_mav = price.rolling(short, min_periods=1).mean()
    long_mav = price.rolling(long, min_periods=1).mean()

    signal = short_mav - long_mav

    binary = signal.apply(sign)
    binary_position = forecast_fixed * binary

    return binary_position

    class simpleSysystemPosition(object):
    def __init__(self, dynamic_vol=False, dynamic_SL = False):
    self.current_position = 0.0
    self.previous_position = 0.0
    self.dynamic_vol = dynamic_vol
    self.dynamic_SL = dynamic_SL

    def no_position_check_for_trade(self, original_position_now, current_price, current_vol):
    assert self.no_current_position
    if np.isnan(original_position_now):
    # no signal
    return 0.0

    if original_position_now ==0.0:
    return 0.0

    # potentially going long / short
    # check last position to avoid whipsaw
    if self.previous_position != 0.0:
    # same way round avoid whipsaw
    if sign(
    original_position_now) == sign(self.previous_position):
    return 0.0

    self.initialise_trade(original_position_now, current_price, current_vol)
    return original_position_now

    @property
    def no_current_position(self):
    return self.current_position==0.0

    def initialise_trade(self, original_position_now, current_price, current_vol):
    # okay to do this - we don't want to enter a new position unless sign changed
    # we set the position at the sized position at moment of
    # inception
    self.current_position = original_position_now
    self.price_list_since_position_held = [current_price]
    self.initial_vol = current_vol
    self.initial_position = original_position_now

    return original_position_now

    def position_on_check_for_close(self, current_price, current_vol):
    assert not self.no_current_position

    # already holding a position
    # calculate HWM
    self.update_price_series(current_price)
    new_position = self.vol_adjusted_position(current_vol)

    time_to_close_trade =self.check_if_hit_stoploss(current_vol)

    if time_to_close_trade:
    self.close_trade()

    return new_position

    def update_price_series(self, current_price):
    price_list_since_position_held = self.price_list_since_position_held
    price_list_since_position_held.append(current_price)

    def vol_adjusted_position(self, current_vol):
    initial_position = self.initial_position
    if self.dynamic_vol:
    vol_adjusted_position = (self.initial_vol / current_vol) * initial_position
    return vol_adjusted_position
    else:
    return initial_position

    def check_if_hit_stoploss(self, current_vol):
    stoploss_gap = self.stoploss_gap(current_vol)

    sign_position = sign(self.current_position)
    if sign_position == 1:
    # long
    time_to_close_trade = self.check_if_long_stop_hit(stoploss_gap)
    else:
    # short
    time_to_close_trade = self.check_if_short_stop_hit(stoploss_gap)

    return time_to_close_trade

    def stoploss_gap(self, current_vol):
    xfactor = self.Xfactor

    if self.dynamic_vol:
    vol = current_vol
    else:
    vol = self.initial_vol

    stoploss_gap = vol * xfactor

    return stoploss_gap

    @property
    def Xfactor(self):
    if self.dynamic_SL:
    return self.dynamic_xfactor()
    else:
    return fixed_xfactor()

    def dynamic_xfactor(self):
    pandl_vol_units = self.vol_adjusted_profit_since_trade_points()
    return dynamic_xfactor(pandl_vol_units)

    def vol_adjusted_profit_since_trade_points(self):
    if self.no_current_position:
    return 0.0

    initial_vol = self.initial_vol
    profit_price_units = self.profit_since_trade_points()

    return profit_price_units / initial_vol

    def profit_since_trade_points(self):
    assert not self.no_current_position
    current_position = self.current_position
    if current_position>0:
    return self.current_price - self.initial_price
    else:
    return self.initial_price - self.current_price

    @property
    def current_price(self):
    price_list_since_position_held = self.price_list_since_position_held
    current_price = price_list_since_position_held[-1]

    return current_price

    @property
    def initial_price(self):
    price_list_since_position_held = self.price_list_since_position_held
    initial_price = price_list_since_position_held[0]

    return initial_price

    def check_if_long_stop_hit(self, stoploss_gap):
    threshold = self.hwm - stoploss_gap
    time_to_close_trade = self.current_price < threshold

    return time_to_close_trade

    def check_if_short_stop_hit(self, stoploss_gap):
    threshold = self.hwm + stoploss_gap
    time_to_close_trade = self.current_price > threshold

    return time_to_close_trade

    @property
    def hwm(self):
    current_position = self.current_position
    if current_position > 0:
    return self.hwm_when_long()
    else:
    return self.hwm_when_short()

    def hwm_when_long(self):
    price_list_since_position_held = self.price_list_since_position_held
    hwm = np.nanmax(price_list_since_position_held)

    return hwm

    def hwm_when_short(self):
    price_list_since_position_held = self.price_list_since_position_held
    hwm = np.nanmin(price_list_since_position_held)

    return hwm

    def close_trade(self):
    self.previous_position = copy(self.current_position)
    self.current_position = 0.0
    self.price_list_since_position_held = []
    del(self.initial_vol)
    del(self.initial_position)

    def fixed_xfactor():
    return 8.0

    def dynamic_xfactor(pandl_vol_units):
    MINIMUM_XFACTOR = 2.0
    MAXIMUM_XFACTOR = 8.0
    PANDL_UPPER_CUTOFF = 8.0
    PANDL_LOWER_CUTOFF = 0.0
    if pandl_vol_units<=PANDL_LOWER_CUTOFF:
    return MINIMUM_XFACTOR
    elif pandl_vol_units>PANDL_UPPER_CUTOFF:
    return MAXIMUM_XFACTOR
    else:
    return MINIMUM_XFACTOR + (pandl_vol_units)*(MAXIMUM_XFACTOR - MINIMUM_XFACTOR)/(PANDL_UPPER_CUTOFF - PANDL_LOWER_CUTOFF)


    def stoploss(price, vol, raw_position, dynamic_vol=False, dynamic_SL = False):
    """
    Apply trailing stoploss
    :param price:
    :param vol: eg system.rawdata.daily_returns_volatility("SP500")
    :param raw_position: Raw position series, without stoploss or entry / exit logic
    :return: New position series
    """
    assert all(vol.index == price.index)
    assert all(price.index == raw_position.index)

    # assume all lined up
    simple_system_position = simpleSysystemPosition(
    dynamic_vol=dynamic_vol,
    dynamic_SL=dynamic_SL)
    new_position_list = []

    for iday in range(len(price)):
    current_price = price[iday]
    current_vol = vol[iday]

    if simple_system_position.no_current_position:
    # no position, check for signal
    original_position_now = raw_position[iday]
    new_position = simple_system_position.no_position_check_for_trade(original_position_now,
    current_price, current_vol)
    else:
    new_position = simple_system_position.position_on_check_for_close(
    current_price, current_vol)

    new_position_list.append(new_position)

    new_position_df = pd.Series(new_position_list, raw_position.index)

    return new_position_df


    class PositionSizeWithStopLoss(PositionSizing):
    @diagnostic()
    def get_subsystem_position_preliminary(self, instrument_code):
    """
    Get scaled position (assuming for now we trade our entire capital for one instrument)
    """
    self.log.msg(
    "Calculating subsystem position for %s" % instrument_code,
    instrument_code=instrument_code,
    )
    """
    We don't allow this to be changed in config
    """
    avg_abs_forecast = system_defaults["average_absolute_forecast"]

    vol_scalar = self.get_volatility_scalar(instrument_code)

    # forecast is binary + or - avg-abs-forecast
    forecast = self.get_combined_forecast(instrument_code)

    vol_scalar = vol_scalar.reindex(forecast.index).ffill()

    # put on a position according to vol and sign of forecast; this will only take effect on a new trade
    subsystem_position = vol_scalar * forecast / avg_abs_forecast

    return subsystem_position

    @output()
    def get_subsystem_position(self, instrument_code):
    """
    Get scaled position (assuming for now we trade our entire capital for one instrument)
    """

    price = self.parent.rawdata.get_daily_prices(instrument_code)
    vol = self.parent.rawdata.daily_returns_volatility(instrument_code)
    raw_position = self.get_subsystem_position_preliminary(instrument_code)

    subsystem_position = stoploss(price, vol, raw_position, dynamic_vol=self.parent.config.dynamic_vol,
    dynamic_SL = self.parent.config.dynamic_SL)

    return subsystem_position


    simple_mav_rule = TradingRule(
    dict(function=simple_mav, other_args=dict(long=40, short=10))
    )

    data = csvFuturesSimData()



    ## trade by trade p&l
    ## only works at subsystem level

    def system_given_flags(dynamic_vol = False, dynamic_SL = False):
    config = Config(
    dict(
    trading_rules=dict(simple_mav=simple_mav_rule),
    percentage_vol_target=16.0,
    notional_trading_capital=100000000,
    dynamic_vol=dynamic_vol,
    dynamic_SL=dynamic_SL
    )
    )

    system = System(
    [
    Account(),
    Portfolios(),
    PositionSizeWithStopLoss(),
    FuturesRawData(),
    ForecastCombine(),
    ForecastScaleCap(),
    Rules(simple_mav_rule),
    ],
    data,
    config,
    )
    system.set_logging_level("on")

    return system



    def stats(stacked_returns):
    stacked_returns_pandl, stacked_trades_pandl = stacked_returns

    print("SR %f" % sharpe_for_stacked(stacked_returns_pandl))
    print("Skew trades %f" % skew_for_stacked(stacked_trades_pandl))
    print("Skew daily returns %f" % skew_for_stacked(stacked_returns_pandl))
    print("Skew weekly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1W"))
    print("Skew monthly returns %f" % skew_for_stacked(stacked_returns_pandl, period="1M"))


    def stacked_returns_over_instrument(system):

    return_list = []
    for instrument in system.get_instrument_list():
    returns = calc_returns_for_system_and_code(instrument, system)
    return_list.append(returns)

    stacked_returns_pandl = [x[0] for x in return_list]
    stacked_trades_pandl = [x[1] for x in return_list]

    return stacked_returns_pandl, stacked_trades_pandl



    def stack_to_df(stacked_returns, period=None):
    if period is not None:
    new_stacked_returns = [returns.resample(period).sum() for returns in stacked_returns]
    else:
    new_stacked_returns = stacked_returns

    df_returns = pd.concat(new_stacked_returns, axis=0)

    return df_returns


    def calc_returns_for_system_and_code(instrument_code: str, system: System):
    pandl_returns_capital = pandl_capital(instrument_code, system, method_used="returns")
    pandl_trades_capital = pandl_capital(instrument_code, system, method_used="trades")

    return pandl_returns_capital, pandl_trades_capital

    def sharpe_for_stacked(stacked_returns_pandl):
    sharpe_list = [sharpe(pandl_series_capital) for pandl_series_capital in stacked_returns_pandl]
    return np.median(sharpe_list)

    def sharpe(pandl_series_capital):
    avg = pandl_series_capital.mean()
    stdev = pandl_series_capital.std()

    return 16* avg / stdev

    def skew_for_stacked(stacked_returns_pandl, period="1B"):
    resampled = [pandl_series_capital.resample(period) for pandl_series_capital in stacked_returns_pandl]
    skew_list = [pandl_series_capital.skew() for pandl_series_capital in resampled]
    return np.median(skew_list)

    def pandl_capital(instrument_code: str, system: System, method_used: str = "returns"):
    pandl_series_money = pandl_money(instrument_code, system, method_used=method_used)
    capital = system.config.notional_trading_capital

    return pandl_series_money / capital

    def pandl_money(instrument_code: str, system: System, method_used: str= "returns"):
    pos_series = system.positionSize.get_subsystem_position(instrument_code)
    price_series = system.rawdata.get_daily_prices(instrument_code)
    block_size =system.data.get_value_of_block_price_move(instrument_code)
    fx =system.positionSize.get_fx_rate(instrument_code)

    ans = pandl_base(price_series, pos_series, block_size, fx, method_used = method_used)

    return ans

    def pandl_base(price_series: pd.Series, pos_series: pd.DataFrame, block_size: float, fx: pd.Series,
    method_used = "returns"):
    pandl_series_local = pandl_local_ccy(price_series, pos_series, block_size, method_used = method_used)
    fx_matching = fx.reindex(pandl_series_local.index).ffill()

    return pandl_series_local * fx_matching

    def pandl_local_ccy(price_series, pos_series, block_size, method_used = "returns"):
    if method_used == "returns":
    pandl_series_points =pandl_returns_points(price_series, pos_series)
    else:
    ## trades
    pandl_series_points = pandl_trades_points(price_series, pos_series)

    return pandl_series_points * block_size

    def pandl_returns_points(price_series, pos_series):
    """
    Calculate pandl for an individual position
    :param price: price series
    :type price: Tx1 pd.Series
    :param trade_series: set of trades done NOT always aligned to price can be length 0
    :type trade_series: Tx2 pd.DataFrame columns ['qty', 'price']
    :param pos_series: series of positions NOT ALWAYS aligned to price
    :type pos_series: Tx1 pd.Series
    :returns: pd.Series
    """

    # want to have both kinds of price
    price_series = price_series.reindex(pos_series.index, method="ffill")
    price_returns = price_series.diff()
    returns = pos_series.shift(1) * price_returns

    return returns


    def pandl_trades_points(price_series, pos_series):
    """
    :returns: pd.Series, only dates when we're trading
    """

    # want to have both kinds of price
    returns = pandl_returns_points(price_series, pos_series)
    trade_dates_idx = get_trade_dates_idx_from_pos_series(pos_series)
    trade_pandl = [sum_between_trade_dates(trade_dates_idx, idx, returns) for idx in range(len(trade_dates_idx))]

    trade_returns = pd.Series(trade_pandl, index = returns.index[trade_dates_idx])

    return trade_returns

    def get_trade_dates_idx_from_pos_series(pos_series):
    prev_position = pos_series.shift(1)
    trade_dates_idx = [idx for idx, _index_date in enumerate(list(pos_series.index))
    if traded(pos_series, prev_position, idx)]
    if trade_dates_idx[-1]<len(pos_series)-1:
    trade_dates_idx.append(len(pos_series)-1)

    return trade_dates_idx

    def traded(pos_series, prev_position, idx):
    if sign(pos_series[idx]) != sign(prev_position[idx]):
    return True
    else:
    return False

    def sum_between_trade_dates(trade_dates_idx, idx, returns):
    if idx==0:
    previous_date_idx = 0
    else:
    previous_date_idx = trade_dates_idx[idx-1]

    current_idx = trade_dates_idx[idx]

    return returns[previous_date_idx: current_idx].sum()


    system = system_given_flags(dynamic_vol=True, dynamic_SL=True)
    stacked_returns = stacked_returns_over_instrument(system)
    stats(stacked_returns)