# -*- coding: utf-8 -*- """ Created on Sun Oct 22 10:31:23 2017 @author: SF """ import numpy as np from keras.layers import Input from keras.models import Model from keras import backend as K from keras.initializers import RandomNormal from keras import regularizers from keras.engine import Layer from keras.callbacks import EarlyStopping from fancyimpute import BiScaler from sklearn.utils import shuffle class KerasMatrixFactorizer(Layer): def __init__(self, latent_dim, input_dim_i, input_dim_j, embeddings_regularizer=None, **kwargs): self.latent_dim = latent_dim self.input_dim_i = input_dim_i self.input_dim_j = input_dim_j self.embeddings_regularizer = regularizers.get(embeddings_regularizer) super(KerasMatrixFactorizer, self).__init__(**kwargs) def build(self, input_shape): # Create a trainable weight variable for this layer. self.i_embedding = self.add_weight( shape=(self.input_dim_i, self.latent_dim), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), name='i_embedding', regularizer=self.embeddings_regularizer ) self.j_embedding = self.add_weight( shape=(self.input_dim_j, self.latent_dim), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), name='j_embedding', regularizer=self.embeddings_regularizer ) self.i_bias = self.add_weight( shape=(self.input_dim_i, 1), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), name='i_bias', regularizer=self.embeddings_regularizer ) self.j_bias = self.add_weight( shape=(self.input_dim_j, 1), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), name='j_bias', regularizer=self.embeddings_regularizer ) self.constant = self.add_weight( shape=(1, 1), initializer='zeros', name='constant', ) self.built = True super(KerasMatrixFactorizer, self).build(input_shape) def call(self, inputs): if K.dtype(inputs) != 'int32': inputs = K.cast(inputs, 'int32') # get the embeddings i = inputs[:, 0] # by convention j = inputs[:, 1] i_embedding = K.gather(self.i_embedding, i) j_embedding = K.gather(self.j_embedding, j) i_bias = K.gather(self.i_bias, i) j_bias = K.gather(self.j_bias, j) # + i_bias + j_bias + constant out = K.batch_dot(i_embedding, j_embedding, axes=[1, 1]) out += i_bias + j_bias + self.constant return out def compute_output_shape(self, input_shape): return (input_shape[0], 1) # step 0: make some data and a mask from sklearn.datasets import load_digits X, _ = load_digits(return_X_y=True) missing_mask = np.random.random(X.shape) < 0.5 # step 1: normalize data before it's flattened normalizer = BiScaler() X = normalizer.fit_transform(X) # step 2: shape data to fit into keras (n_samples, n_features) = X.shape observed_mask = ~missing_mask missing_misk_flak = missing_mask.flatten() observed_mask_flat = observed_mask.flatten() columns, rows = np.meshgrid(np.arange(n_features), np.arange(n_samples)) # training data i_tr = rows.flatten()[observed_mask_flat] j_tr = columns.flatten()[observed_mask_flat] ij_tr = np.vstack([i_tr, j_tr]).T # input to factorizer y_tr = X.flatten()[observed_mask_flat] # output of factorizer ij_tr, y_tr = shuffle(ij_tr, y_tr) # test data i_ts = rows.flatten()[missing_misk_flak] j_ts = columns.flatten()[missing_misk_flak] ij_ts = np.vstack([i_ts, j_ts]).T # input to factorizer y_ts = X.flatten()[missing_misk_flak] # output of factorizer # make a keras model main_input = Input(shape=(2,), dtype='int32') embed = KerasMatrixFactorizer( latent_dim=10, input_dim_i=n_samples, input_dim_j=n_features, embeddings_regularizer=regularizers.l2(1e-4) )(main_input) #d = Dense(1)(embed) # ensure that output dimension is correct model = Model(inputs=main_input, outputs=embed) model.compile(optimizer='nadam', loss='mse') callbacks = [EarlyStopping(patience=3, min_delta=0.001)] model.fit( ij_tr, y_tr, batch_size=128, epochs=500, validation_split=0.1, callbacks=callbacks, shuffle=True, verbose=2 ) # test how well it did y_est = model.predict(ij_ts).T[0] error = np.mean((y_est - y_ts)**2) print(error)