Skip to content

Instantly share code, notes, and snippets.

@chrinide
Forked from ivannp/wfl_tensorflow.py
Created April 19, 2017 06:13
Show Gist options
  • Select an option

  • Save chrinide/7bfa28e6e108e17e03a99ea735211fd9 to your computer and use it in GitHub Desktop.

Select an option

Save chrinide/7bfa28e6e108e17e03a99ea735211fd9 to your computer and use it in GitHub Desktop.

Revisions

  1. @ivannp ivannp created this gist Apr 1, 2017.
    401 changes: 401 additions & 0 deletions wfl_tensorflow.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,401 @@
    import dshelper as dsh
    import instrumentdb as idb
    import logging
    import numpy as np
    import os
    import pandas as pd
    import psutil
    import sys
    import tensorflow as tf
    import time

    from sklearn.preprocessing import OneHotEncoder
    from sklearn.preprocessing import StandardScaler
    from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis as QDA

    from sqlalchemy import create_engine, MetaData
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime, Float, ForeignKey, UniqueConstraint
    from sqlalchemy.orm import sessionmaker

    DeclarativeBase = declarative_base()


    class Model(DeclarativeBase):
    __tablename__ = 'models'

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    __table_args__ = (UniqueConstraint('name', name='unco1'),)


    class Forecast(DeclarativeBase):
    __tablename__ = 'forecasts'

    id = Column(Integer, primary_key=True)
    model = Column(Integer, ForeignKey('models.id'), nullable=False)
    symbol = Column(String)
    ts = Column(DateTime)
    fore = Column(Float)
    details = Column(String)
    __table_args__ = (UniqueConstraint('model', 'ts', 'symbol', name='unco1'),)


    class WalkForwardLoop:
    def __init__(self, model_name, log_file=None, classifier=None, index_format='%Y-%m-%d', db_url=None, scale=True,
    verbose=False, tensorflow=True):
    self.model_name = model_name # The model name to use for the database
    self.classifier = classifier # The classifier object
    self.log_file = log_file
    self.index_format = index_format

    self.db_url = db_url
    self.db_session = None

    self.scale = scale

    if self.db_url is not None:
    self.init_db()

    self.verbose = verbose
    self.tensorflow = tensorflow

    def init_db(self):
    engine = create_engine(self.db_url)
    DeclarativeBase.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    self.db_session = Session()
    try:
    self.db_session.add(Model(name=self.model_name))
    self.db_session.commit()
    except:
    self.db_session.rollback()
    pass
    self.model_id = self.db_session.query(Model.id).filter(Model.name == self.model_name).first()[0]

    def run(self, features, response, forecast_locations, max_history=1e6, symbol_column=None, tensorflow=None,
    verbose=None):
    assert len(features) == len(response)

    if isinstance(verbose, bool):
    self.verbose = verbose

    if isinstance(tensorflow, bool):
    self.tensorflow = tensorflow

    db_session = None
    if self.db_url is not None:
    self.init_db()

    timer = None
    if sys.platform == 'win32':
    timer = time.clock
    else:
    timer = time.time

    for ii in range(0, forecast_locations.len()):
    # Prepare the range for training for this iteration
    history_end = forecast_locations.starts[ii]
    history_start = 0
    if (history_end - history_start + 1) > max_history:
    history_start = history_end - max_history + 1
    xx = features.iloc[history_start:history_end].as_matrix()
    yy = response.iloc[history_start:history_end].as_matrix()

    # Scale the data
    if self.scale:
    std_scaler = StandardScaler()
    xx = std_scaler.fit_transform(xx)

    fore_xx = features.iloc[forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].as_matrix()
    if self.scale:
    fore_xx = std_scaler.transform(fore_xx)

    # Train the model and predict
    start = timer()
    # fore = self.classifier.fit_predict(xx, yy, fore_xx)
    if tensorflow:
    fore = self.tensorflow_fit_predict(xx, yy, fore_xx)
    else:
    fore = self.fit_predict(xx, yy, fore_xx)
    forecasting_time = timer() - start

    fore_df = pd.DataFrame(fore, index=features.iloc[
    forecast_locations.starts[ii]:(forecast_locations.ends[ii] + 1)].index)
    # Generate proper column names. Map -1,0,1 to 'short','out','long'. The 4th column is the class.
    # fore_df.columns = np.append(np.array(['short','long'])[self.classes.astype(int) + 1], ['class'])
    fore_df.ix[:, 2] = np.where(fore_df.ix[:, 2] == -1, 'short', 'long')
    fore_df.columns = np.array(['short_prob', 'long_prob', 'class'])

    # print(fore_df)

    fore = fore[:, 2]

    metric = np.round(np.amax(fore_df.ix[:, 0:4], axis=1), 2)

    # Save results to a database or somewhere else
    if self.db_session is not None:
    for jj in range(len(fore)):
    row_id = forecast_locations.starts[ii] + jj
    ts = features.index[row_id]
    details = fore_df.iloc[[jj]].to_json(orient='split', date_format='iso')
    if symbol_column is not None:
    symbol = symbol_column[row_id]
    rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter(
    Forecast.model == self.model_id).filter(Forecast.symbol == symbol).first()
    if rs is None:
    ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details, symbol=symbol)
    self.db_session.add(ff)
    else:
    ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details,
    symbol=symbol)
    self.db_session.merge(ff)
    else:
    rs = self.db_session.query(Forecast.id).filter(Forecast.ts == ts).filter(
    Forecast.model == self.model_id).first()
    if rs is None:
    ff = Forecast(model=self.model_id, ts=ts, fore=fore[jj], details=details)
    self.db_session.add(ff)
    else:
    ff = Forecast(id=rs[0], model=self.model_id, ts=ts, fore=fore[jj], details=details)
    self.db_session.merge(ff)

    # Log output
    if self.log_file is not None:
    out_str = "\n" + features.index[forecast_locations.starts[ii]].strftime(self.index_format) + " - " + \
    features.index[forecast_locations.ends[ii]].strftime(self.index_format) + "\n" + \
    "=======================\n" + \
    " history: from: " + features.index[history_start].strftime(self.index_format) + ", to: " + \
    features.index[history_end - 1].strftime(self.index_format) + \
    ", length: " + str(history_end - history_start) + "\n" + \
    " forecast length: " + str(
    forecast_locations.ends[ii] - forecast_locations.starts[ii] + 1) + "\n" + \
    " forecast: [" + ','.join(str(round(ff, 2)) for ff in fore) + "]\n" + \
    " probs: [" + ','.join(str(round(mm, 2)) for mm in metric) + "]\n" + \
    " time [training+forecasting]: " + str(round(forecasting_time, 2)) + " secs\n"
    with open(self.log_file, "a") as ff:
    print(out_str, file=ff)

    if self.db_session is not None:
    self.db_session.commit()

    def tensorflow_fit_predict(self, x, y, newx):

    learning_rate = 0.01
    batch_size = 'auto'
    num_passes = 2
    display_step = 1

    if isinstance(batch_size, str):
    if batch_size == 'auto':
    batch_size = min(200, x.shape[0])
    else:
    raise ValueError("'auto' is the only acceptable string for batch_size")

    num_batches = x.shape[0] // batch_size

    # print("num_batches = {0}, batch_size = {1}, x.shape = {2}".format(num_batches, batch_size, x.shape))

    # Map the y's to [0,nlevels)
    classes = np.sort(np.unique(y))
    self.classes = classes
    yz = np.searchsorted(classes, y)

    # One hot encode them
    ohe = OneHotEncoder(n_values=len(classes), sparse=False)
    yy = ohe.fit_transform(yz)

    res = None

    nfeatures = x.shape[1]
    nlabels = yy.shape[1]

    # Define the tensorflow graph. A new graph for each iteration. Otherwise, all iterations use
    # the default graph, and the memory usage explodes quickly.
    with tf.Graph().as_default():
    input = tf.placeholder(tf.float32, [None, nfeatures])
    label = tf.placeholder(tf.float32, [None, nlabels])

    nconv1 = 32
    cw1 = tf.Variable(tf.random_normal([1, 3, 1, nconv1]))
    cb1 = tf.Variable(tf.random_normal([nconv1]))
    conv_input = tf.reshape(input, shape=[-1, 1, nfeatures, 1])
    cl1 = tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(conv_input, cw1, strides=[1, 1, 1, 1], padding='SAME'), cb1))
    mp1 = tf.nn.max_pool(cl1, ksize=[1, 1, 2, 1], strides=[1, 1, 2, 1], padding='SAME')

    nhidden1 = 128
    w1 = tf.Variable(tf.random_normal([378*32, nhidden1]))
    b1 = tf.Variable(tf.random_normal([nhidden1]))
    fc_input = tf.reshape(mp1, [-1, w1.get_shape().as_list()[0]])
    l1 = tf.nn.relu(tf.add(tf.matmul(fc_input, w1), b1))

    nhidden2 = 128
    w2 = tf.Variable(tf.random_normal([nhidden1, nhidden2]))
    b2 = tf.Variable(tf.random_normal([nhidden2]))
    l2 = tf.nn.relu(tf.add(tf.matmul(l1, w2), b2))

    w3 = tf.Variable(tf.random_normal([nhidden2, nlabels]))
    b3 = tf.Variable(tf.random_normal([nlabels]))
    model = tf.add(tf.matmul(l2, w3), b3)

    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=model, labels=label))
    optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

    correct_fores = tf.equal(tf.argmax(model, 1), tf.argmax(label, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_fores, tf.float32))

    # Initializing the variables
    init_op = tf.global_variables_initializer()

    # Train our neural network
    all_features = np.array_split(x, num_batches)
    all_labels = np.array_split(yy, num_batches)

    # Launch the graph
    with tf.Session() as sess:
    sess.run(init_op)

    avg_cost = 0.

    total_batches = num_batches * num_passes

    # Log some data
    first_shape = all_features[0].shape
    last_shape = all_features[len(all_features)-1].shape
    logging.debug("total_batches = {0}, first shape = {1}, last shape = {2}, x shape = {3}, y shape = {4}".format(total_batches, first_shape, last_shape, x.shape, yy.shape))
    process = psutil.Process(os.getpid())
    logging.debug(process.memory_info().rss)

    for ii in range(total_batches):
    features = np.ascontiguousarray(all_features[ii % num_batches])
    labels = np.ascontiguousarray(all_labels[ii % num_batches])

    _, cc, acc = sess.run([optimizer, cost, accuracy], feed_dict={input: features, label: labels})

    if ii % display_step == 0 and self.verbose:
    logging.info("Minibatch: {0:04d}, Loss: {1:.4f}, Accuracy: {2:.2f}%".format(ii + 1, cc, acc * 100))

    # Predict
    out = tf.nn.softmax(model)
    probs = sess.run(out, feed_dict={input: newx})
    # probs = sess.run(tf.argmax(model, 1), feed_dict={input: newx})
    # print(probs)

    if len(probs.shape) == 1:
    probs = np.reshape(probs, (1, -1))

    # Append the resulting class to the probabilities
    res = np.append(probs, [self.classes[np.argmax(probs, 1)]], axis=1)

    return res


    def stack_series(all_data, series):
    res = None
    for ss in series:
    tt = pd.concat([all_data[ss]['full']['entry'], all_data[ss]['features']], axis=1).dropna()
    # tt['symbol'] = pd.Series(ss, index=tt.index)
    tt.insert(0, 'symbol', pd.Series(ss, index=tt.index))
    if res is None:
    res = tt
    else:
    res = res.append(tt)
    res = res.sort_index()
    # res = res.sort(['ts','symbol'])
    return res


    def drive_wfl():
    # symbols = ['HO2']
    symbols = ['HO2', 'CL2']

    all_data = dsh.load('all_data.bin')

    combined = stack_series(all_data, symbols)
    # print(combined['symbol'].tail(20))

    symbol_column = None
    if 'symbol' in combined.columns:
    symbol_column = combined['symbol']
    combined.drop('symbol', axis=1, inplace=True)

    response = combined.iloc[:, 0]
    features = combined.iloc[:, 1:]

    fl = dsh.ForecastLocations(features.index)

    ml = WalkForwardLoop('qda', 'ml.log', db_url='sqlite:///ml.sqlite')
    ml.run(features, response, fl, symbol_column=symbol_column)


    def extend_price(ts1, ts2):
    first_index = np.where(ts2.index == ts1.index[0])
    if len(first_index) != 1 or len(first_index[0]) != 1:
    raise ('Failed to find the index to stich the series')
    first_index = first_index[0][0]
    res = ts2.pct_change().shift(-1)[:first_index].append(ts1)
    # Walk the series backwards, building the prices from the returns
    for ii in range(first_index, 0, -1):
    res[ii - 1] = res[ii] / (1.0 + res[ii - 1])
    return res


    def pinnacle_csv(csv_path):
    ss = pd.read_csv(csv_path, header=None, parse_dates=True, index_col=0)
    ss = ss.ix[:, 0:4]
    ss.columns = ['open', 'high', 'low', 'close']
    return ss


    def returns_wfl():
    ho_pin = pinnacle_csv('d:/DATA/CLCDATA/HO_REV.CSV')
    ho_pin = ho_pin.ix[:, 3]

    db = idb.CsiDb()
    ho_csi = db.load_bars('HO2')
    ho_csi = ho_csi.ix[:, 3]

    ho_ext = np.round(extend_price(ho_csi, ho_pin), 4)

    rets = ho_ext.pct_change()
    erets = rets.pow(2).ewm(span=36).mean().pow(1 / 2)
    arets = rets / erets
    arets = arets.dropna()

    history_len = 3 * 252 # Three years
    nrows = len(arets) - history_len
    mm = np.full((nrows, history_len), np.nan)
    for ii in range(history_len, len(arets)):
    mm[ii - history_len, :] = arets[(ii - history_len + 1):(ii + 1)]

    response = np.where(arets < 0, -1, 1)
    response = pd.DataFrame(response, index=arets.index)
    # Remove the first history_len + 1. The extra one removed is
    # because we need to shift the features one position forward,
    # to align with the response, thus, we loose one more feature.
    response = response.tail(-history_len - 1)
    features = mm[:(mm.shape[0] - 1), :]
    features = pd.DataFrame(features, index=response.index)

    # print(response.head())
    # print(arets.head())
    # print(features.head().iloc[:,-3:])

    # print(response.tail())
    # print(arets.tail())
    # print(features.tail().iloc[:,-3:])

    fl = dsh.ForecastLocations(features.index, start_date="2011-12-31")

    ml = WalkForwardLoop('tf_conv', log_file='ml.log', db_url='sqlite:///ml.sqlite')
    ml.run(features, response, fl, verbose=False, tensorflow=True)


    def main():

    # Init logging
    logging.basicConfig(filename='diag.log',level=logging.DEBUG)
    returns_wfl()

    if __name__ == "__main__":
    main() # Save results to a database or somewhere else