# Example for my blog post at: # http://danijar.com/introduction-to-recurrent-networks-in-tensorflow/ import functools import sets import tensorflow as tf def lazy_property(function): attribute = '_' + function.__name__ @property @functools.wraps(function) def wrapper(self): if not hasattr(self, attribute): setattr(self, attribute, function(self)) return getattr(self, attribute) return wrapper class SequenceLabelling: def __init__(self, data, target, dropout, num_hidden=200, num_layers=3): self.data = data self.target = target self.dropout = dropout self._num_hidden = num_hidden self._num_layers = num_layers self.prediction self.error self.optimize @lazy_property def prediction(self): # Recurrent network. network = tf.nn.rnn_cell.GRUCell(self._num_hidden) network = tf.nn.rnn_cell.DropoutWrapper( network, output_keep_prob=self.dropout) network = tf.nn.rnn_cell.MultiRNNCell([network] * self._num_layers) output, _ = tf.nn.dynamic_rnn(network, data, dtype=tf.float32) # Softmax layer. max_length = int(self.target.get_shape()[1]) num_classes = int(self.target.get_shape()[2]) weight, bias = self._weight_and_bias(self._num_hidden, num_classes) # Flatten to apply same weights to all time steps. output = tf.reshape(output, [-1, self._num_hidden]) prediction = tf.nn.softmax(tf.matmul(output, weight) + bias) prediction = tf.reshape(prediction, [-1, max_length, num_classes]) return prediction @lazy_property def cost(self): cross_entropy = -tf.reduce_sum( self.target * tf.log(self.prediction), [1, 2]) cross_entropy = tf.reduce_mean(cross_entropy) return cross_entropy @lazy_property def optimize(self): learning_rate = 0.003 optimizer = tf.train.RMSPropOptimizer(learning_rate) return optimizer.minimize(self.cost) @lazy_property def error(self): mistakes = tf.not_equal( tf.argmax(self.target, 2), tf.argmax(self.prediction, 2)) return tf.reduce_mean(tf.cast(mistakes, tf.float32)) @staticmethod def _weight_and_bias(in_size, out_size): weight = tf.truncated_normal([in_size, out_size], stddev=0.01) bias = tf.constant(0.1, shape=[out_size]) return tf.Variable(weight), tf.Variable(bias) def read_dataset(): dataset = sets.Ocr() dataset = sets.OneHot(dataset.target, depth=2)(dataset, columns=['target']) dataset['data'] = dataset.data.reshape( dataset.data.shape[:-2] + (-1,)).astype(float) train, test = sets.Split(0.66)(dataset) return train, test if __name__ == '__main__': train, test = read_dataset() _, length, image_size = train.data.shape num_classes = train.target.shape[2] data = tf.placeholder(tf.float32, [None, length, image_size]) target = tf.placeholder(tf.float32, [None, length, num_classes]) dropout = tf.placeholder(tf.float32) model = SequenceLabelling(data, target, dropout) sess = tf.Session() sess.run(tf.initialize_all_variables()) for epoch in range(10): for _ in range(100): batch = train.sample(10) sess.run(model.optimize, { data: batch.data, target: batch.target, dropout: 0.5}) error = sess.run(model.error, { data: test.data, target: test.target, dropout: 1}) print('Epoch {:2d} error {:3.1f}%'.format(epoch + 1, 100 * error))