class GalaxyDivNormLayer(nntools.layers.Layer): """ rectification + divisive normalization """ def __init__(self, input_layer): super(GalaxyDivNormLayer, self).__init__(input_layer) self.question_slices = [slice(0, 3), slice(3, 5), slice(5, 7), slice(7, 9), slice(9, 13), slice(13, 15), slice(15, 18), slice(18, 25), slice(25, 28), slice(28, 31), slice(31, 37)] self.normalisation_mask = theano.shared(self.generate_normalisation_mask()) # self.scaling_mask = theano.shared(self.generate_scaling_mask()) # sequence of scaling steps to be undertaken. # First element is a slice indicating the values to be scaled. Second element is an index indicating the scale factor. # these have to happen IN ORDER else it doesn't work correctly. self.scaling_sequence = [ (slice(3, 5), 1), # I: rescale Q2 by A1.2 (slice(5, 13), 4), # II: rescale Q3, Q4, Q5 by A2.2 (slice(15, 18), 0), # III: rescale Q7 by A1.1 (slice(18, 25), 13), # IV: rescale Q8 by A6.1 (slice(25, 28), 3), # V: rescale Q9 by A2.1 (slice(28, 37), 7), # VI: rescale Q10, Q11 by A4.1 ] def generate_normalisation_mask(self): """ when the clipped input is multiplied by the normalisation mask, the normalisation denominators are generated. So then we can just divide the input by the normalisation constants (elementwise). """ mask = np.zeros((37, 37), dtype=theano.config.floatX) for s in self.question_slices: mask[s, s] = 1.0 return mask def get_output_for(self, input, normalize=True, *args, **kwargs): """ Set normalize to False for the first few iterations to find a good region of parameter space. Normalization tends to complicate the initial steps. """ if normalize: return self.weighted_answer_probabilities(input) else: input_clipped = T.clip(input, 0, 1) # clip on both sides, # any predictions over 1.0 are going to get normalised away anyway. return input_clipped def answer_probabilities(self, input): """ normalise the answer groups for each question. """ input_clipped = T.maximum(input, 0) normalization_denoms = T.dot(input_clipped, self.normalisation_mask) + 1e-12 # small constant to prevent division by 0 input_normalized = input_clipped / normalization_denoms return input_normalized def weighted_answer_probabilities(self, input): probs = self.answer_probabilities(input) # go through the rescaling sequence in order (6 steps) for probs_slice, scale_idx in self.scaling_sequence: probs = T.set_subtensor(probs[:, probs_slice], probs[:, probs_slice] * probs[:, scale_idx].dimshuffle(0, 'x')) return probs