Skip to content

Instantly share code, notes, and snippets.

@mindis
Created June 25, 2019 14:48
Show Gist options
  • Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.
Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.

Revisions

  1. @sergeyf sergeyf revised this gist Oct 22, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion keras_explicit_MF.py
    Original file line number Diff line number Diff line change
    @@ -117,7 +117,7 @@ def compute_output_shape(self, input_shape):
    input_dim_j=n_features,
    embeddings_regularizer=regularizers.l2(1e-4)
    )(main_input)
    #d = Dense(4)(embed)
    #d = Dense(1)(embed) # ensure that output dimension is correct
    model = Model(inputs=main_input, outputs=embed)
    model.compile(optimizer='nadam', loss='mse')
    callbacks = [EarlyStopping(patience=3, min_delta=0.001)]
  2. @sergeyf sergeyf created this gist Oct 22, 2017.
    138 changes: 138 additions & 0 deletions keras_explicit_MF.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,138 @@
    # -*- coding: utf-8 -*-
    """
    Created on Sun Oct 22 10:31:23 2017
    @author: SF
    """

    import numpy as np
    from keras.layers import Input
    from keras.models import Model
    from keras import backend as K
    from keras.initializers import RandomNormal
    from keras import regularizers
    from keras.engine import Layer
    from keras.callbacks import EarlyStopping

    from fancyimpute import BiScaler
    from sklearn.utils import shuffle

    class KerasMatrixFactorizer(Layer):

    def __init__(self, latent_dim, input_dim_i, input_dim_j, embeddings_regularizer=None, **kwargs):
    self.latent_dim = latent_dim
    self.input_dim_i = input_dim_i
    self.input_dim_j = input_dim_j
    self.embeddings_regularizer = regularizers.get(embeddings_regularizer)
    super(KerasMatrixFactorizer, self).__init__(**kwargs)

    def build(self, input_shape):
    # Create a trainable weight variable for this layer.
    self.i_embedding = self.add_weight(
    shape=(self.input_dim_i, self.latent_dim),
    initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)),
    name='i_embedding',
    regularizer=self.embeddings_regularizer
    )
    self.j_embedding = self.add_weight(
    shape=(self.input_dim_j, self.latent_dim),
    initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)),
    name='j_embedding',
    regularizer=self.embeddings_regularizer
    )
    self.i_bias = self.add_weight(
    shape=(self.input_dim_i, 1),
    initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)),
    name='i_bias',
    regularizer=self.embeddings_regularizer
    )
    self.j_bias = self.add_weight(
    shape=(self.input_dim_j, 1),
    initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)),
    name='j_bias',
    regularizer=self.embeddings_regularizer
    )
    self.constant = self.add_weight(
    shape=(1, 1),
    initializer='zeros',
    name='constant',
    )

    self.built = True
    super(KerasMatrixFactorizer, self).build(input_shape)

    def call(self, inputs):
    if K.dtype(inputs) != 'int32':
    inputs = K.cast(inputs, 'int32')
    # get the embeddings
    i = inputs[:, 0] # by convention
    j = inputs[:, 1]
    i_embedding = K.gather(self.i_embedding, i)
    j_embedding = K.gather(self.j_embedding, j)
    i_bias = K.gather(self.i_bias, i)
    j_bias = K.gather(self.j_bias, j)
    # <i_embed, j_embed> + i_bias + j_bias + constant
    out = K.batch_dot(i_embedding, j_embedding, axes=[1, 1])
    out += i_bias + j_bias + self.constant
    return out

    def compute_output_shape(self, input_shape):
    return (input_shape[0], 1)

    # step 0: make some data and a mask
    from sklearn.datasets import load_digits
    X, _ = load_digits(return_X_y=True)
    missing_mask = np.random.random(X.shape) < 0.5

    # step 1: normalize data before it's flattened
    normalizer = BiScaler()
    X = normalizer.fit_transform(X)

    # step 2: shape data to fit into keras
    (n_samples, n_features) = X.shape
    observed_mask = ~missing_mask
    missing_misk_flak = missing_mask.flatten()
    observed_mask_flat = observed_mask.flatten()

    columns, rows = np.meshgrid(np.arange(n_features), np.arange(n_samples))

    # training data
    i_tr = rows.flatten()[observed_mask_flat]
    j_tr = columns.flatten()[observed_mask_flat]
    ij_tr = np.vstack([i_tr, j_tr]).T # input to factorizer
    y_tr = X.flatten()[observed_mask_flat] # output of factorizer
    ij_tr, y_tr = shuffle(ij_tr, y_tr)

    # test data
    i_ts = rows.flatten()[missing_misk_flak]
    j_ts = columns.flatten()[missing_misk_flak]
    ij_ts = np.vstack([i_ts, j_ts]).T # input to factorizer
    y_ts = X.flatten()[missing_misk_flak] # output of factorizer

    # make a keras model
    main_input = Input(shape=(2,), dtype='int32')
    embed = KerasMatrixFactorizer(
    latent_dim=10,
    input_dim_i=n_samples,
    input_dim_j=n_features,
    embeddings_regularizer=regularizers.l2(1e-4)
    )(main_input)
    #d = Dense(4)(embed)
    model = Model(inputs=main_input, outputs=embed)
    model.compile(optimizer='nadam', loss='mse')
    callbacks = [EarlyStopping(patience=3, min_delta=0.001)]
    model.fit(
    ij_tr,
    y_tr,
    batch_size=128,
    epochs=500,
    validation_split=0.1,
    callbacks=callbacks,
    shuffle=True,
    verbose=2
    )

    # test how well it did
    y_est = model.predict(ij_ts).T[0]
    error = np.mean((y_est - y_ts)**2)
    print(error)