Skip to content

Instantly share code, notes, and snippets.

@jaikumarm
Forked from jkleint/timeseries_cnn.py
Created April 17, 2017 23:39
Show Gist options
  • Save jaikumarm/abd6e71a384705942124a3295d5f47b2 to your computer and use it in GitHub Desktop.
Save jaikumarm/abd6e71a384705942124a3295d5f47b2 to your computer and use it in GitHub Desktop.

Revisions

  1. @jkleint jkleint created this gist Jul 29, 2016.
    121 changes: 121 additions & 0 deletions timeseries_cnn.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,121 @@
    #!/usr/bin/env python
    """
    Example of using Keras to implement a 1D convolutional neural network (CNN) for timeseries prediction.
    """

    from __future__ import print_function, division

    import numpy as np
    from keras.layers import Convolution1D, Dense, MaxPooling1D, Flatten
    from keras.models import Sequential


    __date__ = '2016-07-22'


    def make_timeseries_regressor(window_size, filter_length, nb_input_series=1, nb_outputs=1, nb_filter=4):
    """:Return: a Keras Model for predicting the next value in a timeseries given a fixed-size lookback window of previous values.
    The model can handle multiple input timeseries (`nb_input_series`) and multiple prediction targets (`nb_outputs`).
    :param int window_size: The number of previous timeseries values to use as input features. Also called lag or lookback.
    :param int nb_input_series: The number of input timeseries; 1 for a single timeseries.
    The `X` input to ``fit()`` should be an array of shape ``(n_instances, window_size, nb_input_series)``; each instance is
    a 2D array of shape ``(window_size, nb_input_series)``. For example, for `window_size` = 3 and `nb_input_series` = 1 (a
    single timeseries), one instance could be ``[[0], [1], [2]]``. See ``make_timeseries_instances()``.
    :param int nb_outputs: The output dimension, often equal to the number of inputs.
    For each input instance (array with shape ``(window_size, nb_input_series)``), the output is a vector of size `nb_outputs`,
    usually the value(s) predicted to come after the last value in that input instance, i.e., the next value
    in the sequence. The `y` input to ``fit()`` should be an array of shape ``(n_instances, nb_outputs)``.
    :param int filter_length: the size (along the `window_size` dimension) of the sliding window that gets convolved with
    each position along each instance. The difference between 1D and 2D convolution is that a 1D filter's "height" is fixed
    to the number of input timeseries (its "width" being `filter_length`), and it can only slide along the window
    dimension. This is useful as generally the input timeseries have no spatial/ordinal relationship, so it's not
    meaningful to look for patterns that are invariant with respect to subsets of the timeseries.
    :param int nb_filter: The number of different filters to learn (roughly, input patterns to recognize).
    """
    model = Sequential((
    # The first conv layer learns `nb_filter` filters (aka kernels), each of size ``(filter_length, nb_input_series)``.
    # Its output will have shape (None, window_size - filter_length + 1, nb_filter), i.e., for each position in
    # the input timeseries, the activation of each filter at that position.
    Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu', input_shape=(window_size, nb_input_series)),
    MaxPooling1D(), # Downsample the output of convolution by 2X.
    Convolution1D(nb_filter=nb_filter, filter_length=filter_length, activation='relu'),
    MaxPooling1D(),
    Flatten(),
    Dense(nb_outputs, activation='linear'), # For binary classification, change the activation to 'sigmoid'
    ))
    model.compile(loss='mse', optimizer='adam', metrics=['mae'])
    # To perform (binary) classification instead:
    # model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['binary_accuracy'])
    return model


    def make_timeseries_instances(timeseries, window_size):
    """Make input features and prediction targets from a `timeseries` for use in machine learning.
    :return: A tuple of `(X, y, q)`. `X` are the inputs to a predictor, a 3D ndarray with shape
    ``(timeseries.shape[0] - window_size, window_size, timeseries.shape[1] or 1)``. For each row of `X`, the
    corresponding row of `y` is the next value in the timeseries. The `q` or query is the last instance, what you would use
    to predict a hypothetical next (unprovided) value in the `timeseries`.
    :param ndarray timeseries: Either a simple vector, or a matrix of shape ``(timestep, series_num)``, i.e., time is axis 0 (the
    row) and the series is axis 1 (the column).
    :param int window_size: The number of samples to use as input prediction features (also called the lag or lookback).
    """
    timeseries = np.asarray(timeseries)
    assert 0 < window_size < timeseries.shape[0]
    X = np.atleast_3d(np.array([timeseries[start:start + window_size] for start in range(0, timeseries.shape[0] - window_size)]))
    y = timeseries[window_size:]
    q = np.atleast_3d([timeseries[-window_size:]])
    return X, y, q


    def evaluate_timeseries(timeseries, window_size):
    """Create a 1D CNN regressor to predict the next value in a `timeseries` using the preceding `window_size` elements
    as input features and evaluate its performance.
    :param ndarray timeseries: Timeseries data with time increasing down the rows (the leading dimension/axis).
    :param int window_size: The number of previous timeseries values to use to predict the next.
    """
    filter_length = 5
    nb_filter = 4
    timeseries = np.atleast_2d(timeseries)
    if timeseries.shape[0] == 1:
    timeseries = timeseries.T # Convert 1D vectors to 2D column vectors

    nb_samples, nb_series = timeseries.shape
    print('\n\nTimeseries ({} samples by {} series):\n'.format(nb_samples, nb_series), timeseries)
    model = make_timeseries_regressor(window_size=window_size, filter_length=filter_length, nb_input_series=nb_series, nb_outputs=nb_series, nb_filter=nb_filter)
    print('\n\nModel with input size {}, output size {}, {} conv filters of length {}'.format(model.input_shape, model.output_shape, nb_filter, filter_length))
    model.summary()

    X, y, q = make_timeseries_instances(timeseries, window_size)
    print('\n\nInput features:', X, '\n\nOutput labels:', y, '\n\nQuery vector:', q, sep='\n')
    test_size = int(0.01 * nb_samples) # In real life you'd want to use 0.2 - 0.5
    X_train, X_test, y_train, y_test = X[:-test_size], X[-test_size:], y[:-test_size], y[-test_size:]
    model.fit(X_train, y_train, nb_epoch=25, batch_size=2, validation_data=(X_test, y_test))

    pred = model.predict(X_test)
    print('\n\nactual', 'predicted', sep='\t')
    for actual, predicted in zip(y_test, pred.squeeze()):
    print(actual.squeeze(), predicted, sep='\t')
    print('next', model.predict(q).squeeze(), sep='\t')


    def main():
    """Prepare input data, build model, evaluate."""
    np.set_printoptions(threshold=25)
    ts_length = 1000
    window_size = 50

    print('\nSimple single timeseries vector prediction')
    timeseries = np.arange(ts_length) # The timeseries f(t) = t
    evaluate_timeseries(timeseries, window_size)

    print('\nMultiple-input, multiple-output prediction')
    timeseries = np.array([np.arange(ts_length), -np.arange(ts_length)]).T # The timeseries f(t) = [t, -t]
    evaluate_timeseries(timeseries, window_size)


    if __name__ == '__main__':
    main()