# Author: Kyle Kastner # License: BSD 3-Clause import torch as th import torch.nn as nn import torch.optim as optim from torch.autograd import Variable import torch.nn.functional as F from torch.utils.data import TensorDataset, DataLoader import time import numpy as np # Modified from https://github.com/sorki/python-mnist # License: BSD 3-Clause def load_mnist(which_set, data_path="data/mnist"): if data_path[-1] != os.sep: data_path = data_path + os.sep train_image_file = data_path + "train-images-idx3-ubyte" train_label_file = data_path + "train-labels-idx1-ubyte" test_image_file = data_path + "t10k-images-idx3-ubyte" test_label_file = data_path + "t10k-labels-idx1-ubyte" if which_set not in ["train", "valid", "test"]: raise ValueError("Unknown argument setting for which_set: %s" % which_set) if which_set in ["train", "valid"]: path_lbl = train_label_file path_img = train_image_file elif which_set == "test": path_lbl = test_image_file path_img = test_image_file with open(path_lbl, 'rb') as f: magic, size = struct.unpack(">II", f.read(8)) if magic != 2049: raise ValueError('Magic number mismatch, expected 2049,' 'got {}'.format(magic)) labels = array("B", f.read()) with open(path_img, 'rb') as f: magic, size, rows, cols = struct.unpack(">IIII", f.read(16)) if magic != 2051: raise ValueError('Magic number mismatch, expected 2051,' 'got {}'.format(magic)) image_data = array("B", f.read()) images = [] for i in range(size): images.append([0] * rows * cols) for i in range(size): images[i][:] = image_data[i * rows * cols:(i + 1) * rows * cols] images = np.array(images, dtype=np.float32) labels = np.array(labels, dtype=np.int64) if which_set == "train": images = images[:50000] labels = labels[:50000] elif which_set == "valid": images = images[50000:] labels = labels[50000:] elif which_set == "test": # test set from separate file, should be correct pass return images, labels def numpy_softmax(arr): maxes = np.amax(arr, axis=1) maxes = maxes.reshape(maxes.shape[0], 1) e = np.exp(arr - maxes) dist = e / np.sum(e, axis=1, keepdims=True) return dist def weights_init(mod): classname = mod.__class__.__name__ if classname.find('Conv') != -1: mod.weight.data.normal_(0.0, 0.2) elif classname.find('Linear') != -1: mod.weight.data.normal_(0.0, 0.1) elif classname.find('BatchNorm') != -1: mod.weight.data.normal_(1.0, 0.02) mod.bias.data.fill_(0) class Net(nn.Module): def __init__(self, input_dim, target_dim): super(Net, self).__init__() self.fc1 = nn.Linear(input_dim, 500) self.fc2 = nn.Linear(500, target_dim) def forward(self, x): x = F.relu(self.fc1(x)) x = self.fc2(x) return x minibatch_size = 20 n_epochs = 1000 train_images, train_labels = load_mnist("train") valid_images, valid_labels = load_mnist("train") test_images, test_labels = load_mnist("train") n_input = 28 * 28 n_target = 10 train_dataset = TensorDataset(th.from_numpy(train_images), th.from_numpy(train_labels)) train_dataloader = DataLoader(train_dataset, batch_size=minibatch_size, shuffle=True, num_workers=2, pin_memory=True) valid_dataset = TensorDataset(th.from_numpy(valid_images), th.from_numpy(valid_labels)) valid_dataloader = DataLoader(valid_dataset, batch_size=minibatch_size, shuffle=True, num_workers=2, pin_memory=True) net = Net(n_input, n_target) net.apply(weights_init) inp = Variable(th.FloatTensor(minibatch_size, n_input)) target = Variable(th.LongTensor(minibatch_size, n_target)) cuda = True if cuda: net = net.cuda() #optimizer = optim.SGD(net.parameters(), lr = 0.01) optimizer = optim.Adam(net.parameters(), lr = 0.0001) criterion = nn.CrossEntropyLoss() def predict_function(inp): return net(inp) def cost_function(inp, target): output = net(inp) loss = criterion(output, target) return loss def fit_function(inp, target): optimizer.zero_grad() # zero the gradient buffers loss = cost_function(inp, target) loss.backward() optimizer.step() # Does the update return loss n_epochs = 1000 tot_start = time.time() for e in range(n_epochs): sum_avg_loss = 0. check_minibatches = 0 start = time.time() for data in train_dataloader: X_mb, y_mb = data y_mb = y_mb[:, 0] if cuda: X_mb = X_mb.cuda() y_mb = y_mb.cuda() inp = Variable(X_mb) target = Variable(y_mb) l = fit_function(inp, target) sum_avg_loss += l.cpu().data.numpy() check_minibatches += 1 end = time.time() total_wrong = 0 total = 0 for data in valid_dataloader: X_mb, y_mb = data y_mb = y_mb[:, 0] y_true = y_mb.cpu().numpy() if cuda: X_mb = X_mb.cuda() y_mb = y_mb.cuda() lin_pred = predict_function(inp) lin_pred = lin_pred.cpu().data.numpy() pred = numpy_softmax(lin_pred) wrong = sum(y_true.astype("int32") != pred.argmax(axis=-1)) total_wrong += wrong total += y_true.shape[0] print("Epoch %i" % e) print("Training loop time: %s seconds" % (str(end - start))) print("Training loss: %s" % str(float(sum_avg_loss) / check_minibatches)) print("Valid error: %s" % (str(float(total_wrong) / total))) tot_end = time.time() print("Overall time %s" % (tot_end - tot_start))