Skip to content

Instantly share code, notes, and snippets.

@benanne
Created December 4, 2014 15:18
Show Gist options
  • Select an option

  • Save benanne/44e05db6537d0b6b9ba2 to your computer and use it in GitHub Desktop.

Select an option

Save benanne/44e05db6537d0b6b9ba2 to your computer and use it in GitHub Desktop.

Revisions

  1. benanne created this gist Dec 4, 2014.
    62 changes: 62 additions & 0 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,62 @@
    class GalaxyDivNormLayer(nntools.layers.Layer):
    """
    rectification + divisive normalization
    """
    def __init__(self, input_layer):
    super(GalaxyDivNormLayer, self).__init__(input_layer)

    self.question_slices = [slice(0, 3), slice(3, 5), slice(5, 7), slice(7, 9), slice(9, 13), slice(13, 15),
    slice(15, 18), slice(18, 25), slice(25, 28), slice(28, 31), slice(31, 37)]

    self.normalisation_mask = theano.shared(self.generate_normalisation_mask())
    # self.scaling_mask = theano.shared(self.generate_scaling_mask())

    # sequence of scaling steps to be undertaken.
    # First element is a slice indicating the values to be scaled. Second element is an index indicating the scale factor.
    # these have to happen IN ORDER else it doesn't work correctly.
    self.scaling_sequence = [
    (slice(3, 5), 1), # I: rescale Q2 by A1.2
    (slice(5, 13), 4), # II: rescale Q3, Q4, Q5 by A2.2
    (slice(15, 18), 0), # III: rescale Q7 by A1.1
    (slice(18, 25), 13), # IV: rescale Q8 by A6.1
    (slice(25, 28), 3), # V: rescale Q9 by A2.1
    (slice(28, 37), 7), # VI: rescale Q10, Q11 by A4.1
    ]

    def generate_normalisation_mask(self):
    """
    when the clipped input is multiplied by the normalisation mask, the normalisation denominators are generated.
    So then we can just divide the input by the normalisation constants (elementwise).
    """
    mask = np.zeros((37, 37), dtype=theano.config.floatX)
    for s in self.question_slices:
    mask[s, s] = 1.0
    return mask

    def get_output_for(self, input, normalize=True, *args, **kwargs):
    """
    Set normalize to False for the first few iterations to find a good region of
    parameter space. Normalization tends to complicate the initial steps.
    """
    if normalize:
    return self.weighted_answer_probabilities(input)
    else:
    input_clipped = T.clip(input, 0, 1) # clip on both sides,
    # any predictions over 1.0 are going to get normalised away anyway.
    return input_clipped

    def answer_probabilities(self, input):
    """
    normalise the answer groups for each question.
    """
    input_clipped = T.maximum(input, 0)
    normalization_denoms = T.dot(input_clipped, self.normalisation_mask) + 1e-12 # small constant to prevent division by 0
    input_normalized = input_clipped / normalization_denoms
    return input_normalized

    def weighted_answer_probabilities(self, input):
    probs = self.answer_probabilities(input)
    # go through the rescaling sequence in order (6 steps)
    for probs_slice, scale_idx in self.scaling_sequence:
    probs = T.set_subtensor(probs[:, probs_slice], probs[:, probs_slice] * probs[:, scale_idx].dimshuffle(0, 'x'))
    return probs