Skip to content

Instantly share code, notes, and snippets.

@tiancheng91
Created July 12, 2019 08:07
Show Gist options
  • Save tiancheng91/2ee3c21c445674bb1162a3fbfc54195d to your computer and use it in GitHub Desktop.
Save tiancheng91/2ee3c21c445674bb1162a3fbfc54195d to your computer and use it in GitHub Desktop.

Revisions

  1. tiancheng91 created this gist Jul 12, 2019.
    168 changes: 168 additions & 0 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,168 @@
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    import numpy as np
    from numpy import linalg as LA

    class Activation:
    def f(self, x, **args):
    raise NotImplementedError("Should have implemented this")

    def grad(self, y, dy):
    raise NotImplementedError("Should have implemented this")

    class Linear(Activation):
    def f(self, x):
    return x

    def grad(self, y, dy):
    return dy

    class Sigmoid(Activation):
    def f(self, x):
    return 1/(1+np.exp(-x))

    def grad(self, y, dy):
    return y*(1-y)*dy

    class Relu(Activation):
    def f(self, x):
    return x*(x>0)

    def grad(self, y, dy):
    return dy*(y>0)

    class Softmax(Activation):
    def f(self, x, axis=1):
    x = x-np.max(x, axis=axis, keepdims=True)
    return np.exp(x)/np.sum(np.exp(x), axis=axis, keepdims=True)

    def grad(self, y, dy):
    return y/(y.shape[0])+y*dy

    class Dense:
    activation_map = {
    'relu': Relu,
    'softmax': Softmax,
    'sigmoid': Sigmoid,
    'linear': Linear,
    }
    def __init__(self, output_dim, input_dim=0, activation='relu'):
    self.output_dim = output_dim
    self.input_dim = input_dim
    if activation in self.activation_map:
    self.activation = self.activation_map[activation]()
    else:
    raise Exception('activation %s not implemented' % activation)

    def initialize_parameter(self):
    self.w = np.random.randn(self.input_dim, self.output_dim)*np.sqrt(6/(self.input_dim+self.output_dim))
    self.b = np.zeros((1, self.output_dim))

    def initialize_optimizer(self, optimizer, l_rate):
    self.optimizer = optimizer
    self.l_rate = l_rate
    if self.optimizer == 'adam':
    self.t, self.s_w, self.r_w, self.s_b, self.r_b = 0, 0, 0, 0, 0
    self.rho1, self.rho2, self.delta = 0.9, 0.999, 1e-8
    else:
    raise Exception('optimizer %s not implemented' % self.optimizer)

    def forward(self, x):
    self.x = x
    self.h = np.dot(self.x, self.w)+self.b
    self.a = self.activation.f(self.h)
    return self.a

    def backward(self, da):
    self.da = da
    self.dh = self.activation.grad(self.a, self.da)
    self.dw = np.dot(self.x.T, self.dh)
    self.db = (1/self.x.shape[0])*np.sum(self.dh, axis=0, keepdims=True)
    self.dx = np.dot(self.dh, self.w.T)
    return self.dx

    def update_parameter(self):
    if self.optimizer == 'adam':
    self.t = self.t+1
    self.s_w = self.rho1*self.s_w+(1-self.rho1)*self.dw
    self.r_w = self.rho2*self.r_w+(1-self.rho2)*(self.dw**2)
    s_w_ = self.s_w/(1-self.rho1**self.t)
    r_w_ = self.r_w/(1-self.rho2**self.t)
    self.w = self.w-self.l_rate*s_w_/(np.sqrt(r_w_)+self.delta)
    self.s_b = self.rho1*self.s_b+(1-self.rho1)*self.db
    self.r_b = self.rho2*self.r_b+(1-self.rho2)*(self.db**2)
    s_b_ = self.s_b/(1-self.rho1**self.t)
    r_b_ = self.r_b/(1-self.rho2**self.t)
    self.b = self.b-self.l_rate*s_b_/(np.sqrt(r_b_)+self.delta)
    else:
    raise Exception('optimizer %s not implemented' % self.optimizer)

    class Sequential:
    def __init__(self):
    self.layers = []
    self.loss = 'categorical_crossentropy'
    self.optimizer = 'adam'

    def add(self, layer):
    self.layers.append(layer)

    def compile(self, loss='categorical_crossentropy', optimizer='adam', l_rate=0.001):
    self.loss = loss
    self.optimizer = optimizer
    for idx in range(len(self.layers)-1):
    self.layers[idx+1].input_dim = self.layers[idx].output_dim
    for layer in self.layers:
    layer.initialize_optimizer(optimizer, l_rate)

    def forward_propagation(self, x, y):
    for layer in self.layers:
    a = layer.forward(x)
    x = a
    if self.loss == 'categorical_crossentropy':
    loss = -(1/y.shape[0])*np.sum(np.log(a)*y)
    elif self.loss == 'mse':
    loss = 0.5*(1/y.shape[0])*np.square(LA.norm(a-y))
    else:
    raise Exception('loss %s not implemented' % self.loss)
    return a, loss

    def backward_propagation(self, a, y):
    if self.loss == 'categorical_crossentropy':
    da = -(1/y.shape[0])*(y/a)
    elif self.loss == 'mse':
    da = (1/y.shape[0])*(a-y)
    else:
    raise Exception('loss %s not implemented' % self.loss)
    for layer in self.layers[::-1]:
    da = layer.backward(da)
    layer.update_parameter()

    def fit(self, x, y, epochs=10, batch_size=200):
    for layer in self.layers:
    layer.initialize_parameter()
    batch_count = int(x.shape[0]/batch_size)
    for i in range(epochs):
    for j in range(batch_count):
    start, end = j*batch_size, (j+1)*batch_size
    a, _ = self.forward_propagation(x[start:end], y[start:end])
    self.backward_propagation(a, y[start:end])
    _, loss = self.forward_propagation(x, y)
    print("epoch %d/%d: loss %f" % (i+1, epochs, loss))

    def print_parameters(self):
    for idx, layer in enumerate(self.layers):
    print('layer %d parameters:' % (idx+1))
    print(layer.w, layer.b)

    # A simple linear regression demo
    if __name__ == '__main__':
    w, b = np.array([[1.0], [2.0], [3.0]]), 5
    x = np.random.randn(300, 3)*100
    noise = np.random.randn(300, 1)*0.1
    y = np.dot(x, w)+noise+b
    model = Sequential()
    model.add(Dense(1, input_dim=3, activation='linear'))
    model.compile(loss='mse', optimizer='adam')
    model.fit(x, y, epochs=5000, batch_size=100)
    model.print_parameters()