Created
June 25, 2019 14:48
-
-
Save mindis/cee3fafc9dda5e4c8e527c1b10c3f0aa to your computer and use it in GitHub Desktop.
Revisions
-
sergeyf revised this gist
Oct 22, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -117,7 +117,7 @@ def compute_output_shape(self, input_shape): input_dim_j=n_features, embeddings_regularizer=regularizers.l2(1e-4) )(main_input) #d = Dense(1)(embed) # ensure that output dimension is correct model = Model(inputs=main_input, outputs=embed) model.compile(optimizer='nadam', loss='mse') callbacks = [EarlyStopping(patience=3, min_delta=0.001)] -
sergeyf created this gist
Oct 22, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,138 @@ # -*- coding: utf-8 -*- """ Created on Sun Oct 22 10:31:23 2017 @author: SF """ import numpy as np from keras.layers import Input from keras.models import Model from keras import backend as K from keras.initializers import RandomNormal from keras import regularizers from keras.engine import Layer from keras.callbacks import EarlyStopping from fancyimpute import BiScaler from sklearn.utils import shuffle class KerasMatrixFactorizer(Layer): def __init__(self, latent_dim, input_dim_i, input_dim_j, embeddings_regularizer=None, **kwargs): self.latent_dim = latent_dim self.input_dim_i = input_dim_i self.input_dim_j = input_dim_j self.embeddings_regularizer = regularizers.get(embeddings_regularizer) super(KerasMatrixFactorizer, self).__init__(**kwargs) def build(self, input_shape): # Create a trainable weight variable for this layer. self.i_embedding = self.add_weight( shape=(self.input_dim_i, self.latent_dim), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), name='i_embedding', regularizer=self.embeddings_regularizer ) self.j_embedding = self.add_weight( shape=(self.input_dim_j, self.latent_dim), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(self.latent_dim)), name='j_embedding', regularizer=self.embeddings_regularizer ) self.i_bias = self.add_weight( shape=(self.input_dim_i, 1), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), name='i_bias', regularizer=self.embeddings_regularizer ) self.j_bias = self.add_weight( shape=(self.input_dim_j, 1), initializer=RandomNormal(mean=0.0, stddev=1/np.sqrt(2)), name='j_bias', regularizer=self.embeddings_regularizer ) self.constant = self.add_weight( shape=(1, 1), initializer='zeros', name='constant', ) self.built = True super(KerasMatrixFactorizer, self).build(input_shape) def call(self, inputs): if K.dtype(inputs) != 'int32': inputs = K.cast(inputs, 'int32') # get the embeddings i = inputs[:, 0] # by convention j = inputs[:, 1] i_embedding = K.gather(self.i_embedding, i) j_embedding = K.gather(self.j_embedding, j) i_bias = K.gather(self.i_bias, i) j_bias = K.gather(self.j_bias, j) # <i_embed, j_embed> + i_bias + j_bias + constant out = K.batch_dot(i_embedding, j_embedding, axes=[1, 1]) out += i_bias + j_bias + self.constant return out def compute_output_shape(self, input_shape): return (input_shape[0], 1) # step 0: make some data and a mask from sklearn.datasets import load_digits X, _ = load_digits(return_X_y=True) missing_mask = np.random.random(X.shape) < 0.5 # step 1: normalize data before it's flattened normalizer = BiScaler() X = normalizer.fit_transform(X) # step 2: shape data to fit into keras (n_samples, n_features) = X.shape observed_mask = ~missing_mask missing_misk_flak = missing_mask.flatten() observed_mask_flat = observed_mask.flatten() columns, rows = np.meshgrid(np.arange(n_features), np.arange(n_samples)) # training data i_tr = rows.flatten()[observed_mask_flat] j_tr = columns.flatten()[observed_mask_flat] ij_tr = np.vstack([i_tr, j_tr]).T # input to factorizer y_tr = X.flatten()[observed_mask_flat] # output of factorizer ij_tr, y_tr = shuffle(ij_tr, y_tr) # test data i_ts = rows.flatten()[missing_misk_flak] j_ts = columns.flatten()[missing_misk_flak] ij_ts = np.vstack([i_ts, j_ts]).T # input to factorizer y_ts = X.flatten()[missing_misk_flak] # output of factorizer # make a keras model main_input = Input(shape=(2,), dtype='int32') embed = KerasMatrixFactorizer( latent_dim=10, input_dim_i=n_samples, input_dim_j=n_features, embeddings_regularizer=regularizers.l2(1e-4) )(main_input) #d = Dense(4)(embed) model = Model(inputs=main_input, outputs=embed) model.compile(optimizer='nadam', loss='mse') callbacks = [EarlyStopping(patience=3, min_delta=0.001)] model.fit( ij_tr, y_tr, batch_size=128, epochs=500, validation_split=0.1, callbacks=callbacks, shuffle=True, verbose=2 ) # test how well it did y_est = model.predict(ij_ts).T[0] error = np.mean((y_est - y_ts)**2) print(error)