from __future__ import print_function import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms class SamNet(nn.Module): def __init__(self): super(SamNet, self).__init__() # 1 input image, 10 output channels, 5x5 square convolution kernel self.conv1 = nn.Conv2d(1, 10, kernel_size=5) # 10 input channels, 10 output channels, 5x5 square convolution kernel self.conv2 = nn.Conv2d(10, 10, kernel_size=5) self.conv2_drop = nn.Dropout2d() # Only need 2 neurons for output self.fc1 = nn.Linear(160, 2) def forward(self, x): """ You just have to define the forward function, and the backward function (where gradients are computed) is automatically defined for you using autograd. You can use any of the Tensor operations in the forward function. """ # Max pooling over a 2x2 window x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 160) x = self.fc1(x) return F.log_softmax(x, dim=1) class AlexNet(nn.Module): def __init__(self): super(AlexNet, self).__init__() self.conv1 = nn.Conv2d(1, 10, kernel_size=5) self.conv2 = nn.Conv2d(10, 20, kernel_size=5) self.conv2_drop = nn.Dropout2d() self.fc1 = nn.Linear(320, 50) self.fc2 = nn.Linear(50, 10) def forward(self, x): x = F.relu(F.max_pool2d(self.conv1(x), 2)) x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) x = x.view(-1, 320) x = F.relu(self.fc1(x)) x = F.dropout(x, training=self.training) x = self.fc2(x) return F.log_softmax(x, dim=1) def train(args, model, device, train_loader, epoch): optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) model.train() for batch_idx, (data, target) in enumerate(train_loader): for ind, y_val in enumerate(target): target[ind] = 0 if y_val < 5 else 1 data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) # nll_loss = negative log likelihood loss # output = tensor of N x C x H x W in this case of 2D loss # target = tensor of ground truth loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) def test(args, model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: for ind, y_val in enumerate(target): target[ind] = 0 if y_val < 5 else 1 data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) def load_data(args, use_cuda): kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} train_loader = torch.utils.data.DataLoader( datasets.MNIST('../data', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=args.batch_size, shuffle=True, **kwargs) test_loader = torch.utils.data.DataLoader( datasets.MNIST('../data', train=False, transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])), batch_size=args.test_batch_size, shuffle=True, **kwargs) return train_loader, test_loader def parse_args(): # Training settings parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=3, metavar='N', help='number of epochs to train (default: 10)') parser.add_argument('--lr', type=float, default=0.01, metavar='LR', help='learning rate (default: 0.01)') parser.add_argument('--momentum', type=float, default=0.5, metavar='M', help='SGD momentum (default: 0.5)') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') return parser.parse_args() def main(): args = parse_args() use_cuda = torch.cuda.is_available() device = torch.device("cuda" if use_cuda else "cpu") train_loader, test_loader = load_data(args, use_cuda) torch.manual_seed(args.seed) model = SamNet().to(device) # model = AlexNet().to(device) for epoch in range(1, args.epochs + 1): train(args, model, device, train_loader, epoch) test(args, model, device, test_loader) if __name__ == '__main__': main()