Created
June 25, 2019 14:48
-
-
Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.
Keras Explicit Matrix Factorization for Missing Feature Imputation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """ | |
| Created on Sun Oct 22 10:31:23 2017 | |
| @author: SF | |
| """ | |
| import numpy as np | |
| from keras.layers import Input | |
| from keras.models import Model | |
| from keras import backend as K | |
| from keras.initializers import RandomNormal | |
| from keras import regularizers | |
| from keras.engine import Layer | |
| from keras.callbacks import EarlyStopping | |
| from fancyimpute import BiScaler | |
| from sklearn.utils import shuffle | |
| class KerasMatrixFactorizer(Layer): | |
| def __init__(self, latent_dim, input_dim_i, input_dim_j, embeddings_regularizer=None, **kwargs): | |
| self.latent_dim = latent_dim | |
| self.input_dim_i = input_dim_i | |
| self.input_dim_j = input_dim_j | |
| self.embeddings_regularizer = regularizers.get(embeddings_regularizer) | |
| super(KerasMatrixFactorizer, self).__init__(**kwargs) | |
| def build(self, input_shape): | |
| # Create a trainable weight variable for this layer. | |
| self.i_embedding = self.add_weight( | |
| shape=(self.input_dim_i, self.latent_dim), | |
| initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), | |
| name='i_embedding', | |
| regularizer=self.embeddings_regularizer | |
| ) | |
| self.j_embedding = self.add_weight( | |
| shape=(self.input_dim_j, self.latent_dim), | |
| initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), | |
| name='j_embedding', | |
| regularizer=self.embeddings_regularizer | |
| ) | |
| self.i_bias = self.add_weight( | |
| shape=(self.input_dim_i, 1), | |
| initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), | |
| name='i_bias', | |
| regularizer=self.embeddings_regularizer | |
| ) | |
| self.j_bias = self.add_weight( | |
| shape=(self.input_dim_j, 1), | |
| initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), | |
| name='j_bias', | |
| regularizer=self.embeddings_regularizer | |
| ) | |
| self.constant = self.add_weight( | |
| shape=(1, 1), | |
| initializer='zeros', | |
| name='constant', | |
| ) | |
| self.built = True | |
| super(KerasMatrixFactorizer, self).build(input_shape) | |
| def call(self, inputs): | |
| if K.dtype(inputs) != 'int32': | |
| inputs = K.cast(inputs, 'int32') | |
| # get the embeddings | |
| i = inputs[:, 0] # by convention | |
| j = inputs[:, 1] | |
| i_embedding = K.gather(self.i_embedding, i) | |
| j_embedding = K.gather(self.j_embedding, j) | |
| i_bias = K.gather(self.i_bias, i) | |
| j_bias = K.gather(self.j_bias, j) | |
| # <i_embed, j_embed> + i_bias + j_bias + constant | |
| out = K.batch_dot(i_embedding, j_embedding, axes=[1, 1]) | |
| out += i_bias + j_bias + self.constant | |
| return out | |
| def compute_output_shape(self, input_shape): | |
| return (input_shape[0], 1) | |
| # step 0: make some data and a mask | |
| from sklearn.datasets import load_digits | |
| X, _ = load_digits(return_X_y=True) | |
| missing_mask = np.random.random(X.shape) < 0.5 | |
| # step 1: normalize data before it's flattened | |
| normalizer = BiScaler() | |
| X = normalizer.fit_transform(X) | |
| # step 2: shape data to fit into keras | |
| (n_samples, n_features) = X.shape | |
| observed_mask = ~missing_mask | |
| missing_misk_flak = missing_mask.flatten() | |
| observed_mask_flat = observed_mask.flatten() | |
| columns, rows = np.meshgrid(np.arange(n_features), np.arange(n_samples)) | |
| # training data | |
| i_tr = rows.flatten()[observed_mask_flat] | |
| j_tr = columns.flatten()[observed_mask_flat] | |
| ij_tr = np.vstack([i_tr, j_tr]).T # input to factorizer | |
| y_tr = X.flatten()[observed_mask_flat] # output of factorizer | |
| ij_tr, y_tr = shuffle(ij_tr, y_tr) | |
| # test data | |
| i_ts = rows.flatten()[missing_misk_flak] | |
| j_ts = columns.flatten()[missing_misk_flak] | |
| ij_ts = np.vstack([i_ts, j_ts]).T # input to factorizer | |
| y_ts = X.flatten()[missing_misk_flak] # output of factorizer | |
| # make a keras model | |
| main_input = Input(shape=(2,), dtype='int32') | |
| embed = KerasMatrixFactorizer( | |
| latent_dim=10, | |
| input_dim_i=n_samples, | |
| input_dim_j=n_features, | |
| embeddings_regularizer=regularizers.l2(1e-4) | |
| )(main_input) | |
| #d = Dense(1)(embed) # ensure that output dimension is correct | |
| model = Model(inputs=main_input, outputs=embed) | |
| model.compile(optimizer='nadam', loss='mse') | |
| callbacks = [EarlyStopping(patience=3, min_delta=0.001)] | |
| model.fit( | |
| ij_tr, | |
| y_tr, | |
| batch_size=128, | |
| epochs=500, | |
| validation_split=0.1, | |
| callbacks=callbacks, | |
| shuffle=True, | |
| verbose=2 | |
| ) | |
| # test how well it did | |
| y_est = model.predict(ij_ts).T[0] | |
| error = np.mean((y_est - y_ts)**2) | |
| print(error) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment