Skip to content

Instantly share code, notes, and snippets.

@raphant
Created October 29, 2023 08:28
Show Gist options
  • Save raphant/97ac3f49a738a03d7d9a9cd89dbc80d7 to your computer and use it in GitHub Desktop.
Save raphant/97ac3f49a738a03d7d9a9cd89dbc80d7 to your computer and use it in GitHub Desktop.

Revisions

  1. raphant created this gist Oct 29, 2023.
    244 changes: 244 additions & 0 deletions custom_indicators.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,244 @@
    import numpy as np
    import pandas as pd
    import pandas_ta
    import talib

    import pandas_ta as ta
    import finta as TA


    def add_adx_di_signal(df, period=14):
    # Calculate ADX and DI
    adx = talib.ADX(df["high"], df["low"], df["close"], timeperiod=period)
    plus_di = talib.PLUS_DI(df["high"], df["low"], df["close"], timeperiod=period)
    minus_di = talib.MINUS_DI(df["high"], df["low"], df["close"], timeperiod=period)

    # Initialize signal column with 0
    df["adx_di_signal"] = 0

    # Loop through the dataframe
    for i in range(period, len(df)):
    # Check if ADX is above 25 and +DI is above -DI
    if adx[i] > 25 and plus_di[i] > minus_di[i]:
    df.loc[df.index[i], "adx_di_signal"] = 1
    return df


    def calculate_smi(df, k_length, d_length):
    ll = df["low"].rolling(window=k_length).min()
    hh = df["high"].rolling(window=k_length).max()
    diff = hh - ll
    rdiff = df["close"] - (hh + ll) / 2
    avgrel = talib.EMA(talib.EMA(rdiff, d_length), d_length)
    avgdiff = talib.EMA(talib.EMA(diff, d_length), d_length)
    smi = np.where(avgdiff != 0, (avgrel / (avgdiff / 2) * 100), 0)
    return smi


    def add_smi_signal(df, k_length=10, d_length=3, ema_length=10) -> pd.DataFrame:
    # Calculate SMI and EMA of SMI
    smi = calculate_smi(df, k_length, d_length)
    smi_ema = talib.EMA(smi, ema_length)

    # Initialize signal column with 0
    df["smi_signal"] = 0

    # Loop through the dataframe
    for i in range(2, len(df)):
    # Check if SMI crossed above lower band and crossed above the SMI EMA within the next 3 tickers
    if smi[i - 2] < -40 < smi[i - 1] and smi[i - 1] > smi_ema[i - 1]:
    df.loc[df.index[i], "smi_signal"] = 1
    return df


    def add_ema_signal(df, fast_length=50, slow_length=200) -> pd.DataFrame:
    # Calculate fast and slow EMA
    fast_ema = talib.EMA(df["close"], fast_length)
    slow_ema = talib.EMA(df["close"], slow_length)

    # Initialize signal column with 0
    df["ema_signal"] = 0

    # Loop through the dataframe
    for i in range(1, len(df)):
    # Check if fast EMA > slow EMA and price touched, but closed above the fast EMA on last candle
    if (
    slow_ema[i - 1] < fast_ema[i - 1] < df.loc[df.index[i - 1], "close"]
    and df.loc[df.index[i - 1], "low"] <= fast_ema[i - 1]
    ):
    df.loc[df.index[i], "ema_signal"] = 1
    return df


    def add_sar_signal(df, ema_length=200, window_length=200):
    # Calculate SAR and EMA
    sar = talib.SAR(df["high"], df["low"])
    ema = talib.EMA(df["close"], ema_length)

    # Calculate the distance between SAR and price
    sar_distance = abs(df["close"] - sar) / df["close"]

    # Initialize signal column with 0.0
    df["sar_signal"] = 0.0

    # Loop through the dataframe
    for i in range(window_length, len(df)):
    # Normalize sar_distance to range 0-1 using a rolling window
    min_sar_distance = min(
    sar_distance[i - window_length : i + 1].min(), sar_distance[i]
    )
    max_sar_distance = max(
    sar_distance[i - window_length : i + 1].max(), sar_distance[i]
    )
    normalized_sar_distance = (sar_distance[i] - min_sar_distance) / (
    max_sar_distance - min_sar_distance + 1e-10
    )

    # Check if SAR and price are above EMA200 and SAR crosses under price
    if (
    sar[i - 1] > ema[i - 1]
    and df.loc[df.index[i - 1], "close"] > ema[i - 1]
    and sar[i] < df.loc[df.index[i], "close"]
    ):
    # Check if SAR is diverging
    if (
    sar[i] < sar[i - 1]
    and df.loc[df.index[i], "close"] > df.loc[df.index[i - 1], "close"]
    ):
    # Set sar_signal to normalized sar_distance
    df.loc[df.index[i], "sar_signal"] = normalized_sar_distance
    return df


    def calculate_smoothed_ha(df, smooth_length=5):
    # Calculate Heiken Ashi candles
    ha = pandas_ta.ha(df["open"], df["high"], df["low"], df["close"])

    # Smooth the Heiken Ashi candles
    ha_smooth = ha.ewm(span=smooth_length).mean()

    return ha_smooth


    def add_ha_signal(df, smooth_length=5):
    # Calculate smoothed Heiken Ashi candles
    ha_smooth = calculate_smoothed_ha(df, smooth_length)

    # Initialize signal column with 0.0
    df["ha_signal"] = 0.0

    # Calculate the width of the Heiken Ashi bar
    ha_width = abs(ha_smooth["HA_close"] - ha_smooth["HA_open"])

    # Normalize ha_width to range 0-1
    min_ha_width = ha_width.min()
    max_ha_width = ha_width.max()
    normalized_ha_width = (ha_width - min_ha_width) / (max_ha_width - min_ha_width)

    # Loop through the dataframe
    for i in range(1, len(df)):
    # Check if HA is trending "green" or bullish
    if (
    ha_smooth.loc[ha_smooth.index[i], "HA_close"]
    > ha_smooth.loc[ha_smooth.index[i], "HA_open"]
    ):
    # Set ha_signal to normalized ha_width
    df.loc[df.index[i], "ha_signal"] = normalized_ha_width[i]
    return df


    import numpy as np


    def calculate_donchian_channel(df, period=200):
    # Calculate upper and lower band of the Donchian Channel
    upper_band = df["high"].rolling(window=period).max()
    lower_band = df["low"].rolling(window=period).min()

    return upper_band, lower_band


    import numpy as np


    def calculate_donchian_channel(df, period=200):
    # Calculate upper and lower band of the Donchian Channel
    upper_band = df["high"].rolling(window=period).max()
    lower_band = df["low"].rolling(window=period).min()

    return upper_band, lower_band


    def add_donchian_trend_signal(df, period=200) -> pd.DataFrame:
    # Calculate Donchian Channel
    upper_band, lower_band = calculate_donchian_channel(df, period)

    # Initialize signal column with 0
    df["donchian_trend_signal"] = 0

    # Calculate the percentage that the price is above the bullish line for each candle
    df["percentage_above_bullish_line"] = (df["close"] - upper_band) / upper_band * 100

    # Calculate the average and standard deviation of these percentages over the period
    df["average_percentage"] = (
    df["percentage_above_bullish_line"].rolling(window=period).mean()
    )
    df["std_dev_percentage"] = (
    df["percentage_above_bullish_line"].rolling(window=period).std()
    )

    # Normalize the average percentage by dividing it by the sum of the average percentage and the standard deviation
    # Then apply the sigmoid function to squash the values between 0 and 1
    df["donchian_trend_signal"] = 1 / (
    1
    + np.exp(
    -df["average_percentage"]
    / (df["average_percentage"] + df["std_dev_percentage"])
    )
    )

    # Replace any NaN values with 0
    df["donchian_trend_signal"].fillna(0, inplace=True)

    # Drop the intermediate columns used for calculation
    df.drop(
    columns=[
    "percentage_above_bullish_line",
    "average_percentage",
    "std_dev_percentage",
    ],
    inplace=True,
    )

    return df


    def add_rsi_signal(df: pd.DataFrame, period=14) -> pd.DataFrame:
    # Calculate RSI
    rsi = talib.RSI(df["close"], timeperiod=period)

    # Initialize signal column with 0
    df["rsi_signal"] = 0

    # Loop through the dataframe and check if RSI crossed above 30
    for i in range(period, len(df)):
    # Check if RSI is oversold
    if rsi[i] < 30:
    df.loc[df.index[i], "rsi_signal"] = 1
    return df


    def add_bollinger_signal(df: pd.DataFrame, period=20, std_dev=2) -> pd.DataFrame:
    # Calculate Bollinger Bands
    upper_band, middle_band, lower_band = talib.BBANDS(
    df["close"], timeperiod=period, nbdevup=std_dev, nbdevdn=std_dev
    )

    # Initialize signal column with 0
    df["bollinger_signal"] = 0

    # Loop through the dataframe and check if price is below lower band
    for i in range(len(df)):
    if df["close"].iloc[i] < lower_band.iloc[i]:
    df["bollinger_signal"].iloc[i] = 1
    return df