Skip to content

Instantly share code, notes, and snippets.

@mindis
Created June 25, 2019 14:48
Show Gist options
  • Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.
Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.
Keras Explicit Matrix Factorization for Missing Feature Imputation
# -*- coding: utf-8 -*-
"""
Created on Sun Oct 22 10:31:23 2017
@author: SF
"""
import numpy as np
from keras.layers import Input
from keras.models import Model
from keras import backend as K
from keras.initializers import RandomNormal
from keras import regularizers
from keras.engine import Layer
from keras.callbacks import EarlyStopping
from fancyimpute import BiScaler
from sklearn.utils import shuffle
class KerasMatrixFactorizer(Layer):
def __init__(self, latent_dim, input_dim_i, input_dim_j, embeddings_regularizer=None, **kwargs):
self.latent_dim = latent_dim
self.input_dim_i = input_dim_i
self.input_dim_j = input_dim_j
self.embeddings_regularizer = regularizers.get(embeddings_regularizer)
super(KerasMatrixFactorizer, self).__init__(**kwargs)
def build(self, input_shape):
# Create a trainable weight variable for this layer.
self.i_embedding = self.add_weight(
shape=(self.input_dim_i, self.latent_dim),
initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)),
name='i_embedding',
regularizer=self.embeddings_regularizer
)
self.j_embedding = self.add_weight(
shape=(self.input_dim_j, self.latent_dim),
initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)),
name='j_embedding',
regularizer=self.embeddings_regularizer
)
self.i_bias = self.add_weight(
shape=(self.input_dim_i, 1),
initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)),
name='i_bias',
regularizer=self.embeddings_regularizer
)
self.j_bias = self.add_weight(
shape=(self.input_dim_j, 1),
initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)),
name='j_bias',
regularizer=self.embeddings_regularizer
)
self.constant = self.add_weight(
shape=(1, 1),
initializer='zeros',
name='constant',
)
self.built = True
super(KerasMatrixFactorizer, self).build(input_shape)
def call(self, inputs):
if K.dtype(inputs) != 'int32':
inputs = K.cast(inputs, 'int32')
# get the embeddings
i = inputs[:, 0] # by convention
j = inputs[:, 1]
i_embedding = K.gather(self.i_embedding, i)
j_embedding = K.gather(self.j_embedding, j)
i_bias = K.gather(self.i_bias, i)
j_bias = K.gather(self.j_bias, j)
# <i_embed, j_embed> + i_bias + j_bias + constant
out = K.batch_dot(i_embedding, j_embedding, axes=[1, 1])
out += i_bias + j_bias + self.constant
return out
def compute_output_shape(self, input_shape):
return (input_shape[0], 1)
# step 0: make some data and a mask
from sklearn.datasets import load_digits
X, _ = load_digits(return_X_y=True)
missing_mask = np.random.random(X.shape) < 0.5
# step 1: normalize data before it's flattened
normalizer = BiScaler()
X = normalizer.fit_transform(X)
# step 2: shape data to fit into keras
(n_samples, n_features) = X.shape
observed_mask = ~missing_mask
missing_misk_flak = missing_mask.flatten()
observed_mask_flat = observed_mask.flatten()
columns, rows = np.meshgrid(np.arange(n_features), np.arange(n_samples))
# training data
i_tr = rows.flatten()[observed_mask_flat]
j_tr = columns.flatten()[observed_mask_flat]
ij_tr = np.vstack([i_tr, j_tr]).T # input to factorizer
y_tr = X.flatten()[observed_mask_flat] # output of factorizer
ij_tr, y_tr = shuffle(ij_tr, y_tr)
# test data
i_ts = rows.flatten()[missing_misk_flak]
j_ts = columns.flatten()[missing_misk_flak]
ij_ts = np.vstack([i_ts, j_ts]).T # input to factorizer
y_ts = X.flatten()[missing_misk_flak] # output of factorizer
# make a keras model
main_input = Input(shape=(2,), dtype='int32')
embed = KerasMatrixFactorizer(
latent_dim=10,
input_dim_i=n_samples,
input_dim_j=n_features,
embeddings_regularizer=regularizers.l2(1e-4)
)(main_input)
#d = Dense(1)(embed) # ensure that output dimension is correct
model = Model(inputs=main_input, outputs=embed)
model.compile(optimizer='nadam', loss='mse')
callbacks = [EarlyStopping(patience=3, min_delta=0.001)]
model.fit(
ij_tr,
y_tr,
batch_size=128,
epochs=500,
validation_split=0.1,
callbacks=callbacks,
shuffle=True,
verbose=2
)
# test how well it did
y_est = model.predict(ij_ts).T[0]
error = np.mean((y_est - y_ts)**2)
print(error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment