-
-
Save CPPAlien/91388eb16a85e80ec55689069bda0c25 to your computer and use it in GitHub Desktop.
Revisions
-
CPPAlien revised this gist
Sep 22, 2024 . 2 changed files with 124 additions and 132 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,124 @@ import torch import torch.nn as nn import torch.optim as optim import gym import numpy as np import pickle # Hyperparameters H = 200 # Number of hidden layer neurons batch_size = 10 # Every how many episodes to do a param update? learning_rate = 1e-4 gamma = 0.99 # Discount factor for reward decay_rate = 0.99 # Decay factor for RMSProp leaky sum of grad^2 resume = False # Resume from previous checkpoint? render = False # Model initialization D = 80 * 80 # Input dimensionality: 80x80 grid model = nn.Sequential( nn.Linear(D, H), nn.ReLU(), nn.Linear(H, 2) # Output layer for two actions ) optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, alpha=decay_rate) if resume: model.load_state_dict(torch.load('save.p')) def prepro(I): """ Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """ I = np.array(I[0]) if isinstance(I, tuple) else np.array(I) # Ensure I is a NumPy array I = I[35:195] if I.ndim == 3: # Check if I has 3 dimensions I = I[::2, ::2, 0] # Downsample by factor of 2 else: # Handle the case where I is 2-dimensional I = I[::2, ::2] # Downsample accordingly I[I == 144] = 0 # Erase background (background type 1) I[I == 109] = 0 # Erase background (background type 2) I[I != 0] = 1 # Everything else (paddles, ball) just set to 1 return torch.FloatTensor(I).view(-1) # Convert to PyTorch tensor and flatten def discount_rewards(r): """ Take 1D float array of rewards and compute discounted reward """ discounted_r = np.zeros_like(r) running_add = 0 for t in reversed(range(r.size)): if r[t] != 0: running_add = 0 # Reset the sum, since this was a game boundary running_add = running_add * gamma + r[t] discounted_r[t] = running_add return discounted_r def policy_forward(x): h = model(x) p = torch.softmax(h, dim=0) # Apply softmax to get action probabilities return p env = gym.make("PongNoFrameskip-v4") observation = env.reset() prev_x = None # Used in computing the difference frame xs, hs, dlogps, drs = [], [], [], [] running_reward = None reward_sum = 0 episode_number = 0 while True: if render: env.render() # Preprocess the observation, set input to network to be difference image cur_x = prepro(observation) x = cur_x - prev_x if prev_x is not None else torch.zeros(D) prev_x = cur_x # Forward the policy network and sample an action from the returned probability aprob = policy_forward(x) action = 2 if torch.rand(1).item() < aprob[1] else 3 # Roll the dice! # Record various intermediates (needed later for backprop) xs.append(x) # Observation hs.append(aprob) # Hidden state y = 1 if action == 2 else 0 # A "fake label" dlogps.append(y - aprob[1]) # Grad that encourages the action that was taken # Step the environment and get new measurements result = env.step(action) # Capture all returned values observation, reward, done, info = result[:4] # Unpack the first four values reward_sum += reward drs.append(reward) # Record reward if done: # An episode finished episode_number += 1 # Stack together all inputs, hidden states, action gradients, and rewards for this episode epx = torch.stack(xs) eph = torch.stack(hs) epdlogp = torch.stack(dlogps) epr = torch.FloatTensor(drs) xs, hs, dlogps, drs = [], [], [], [] # Reset array memory # Compute the discounted reward backwards through time discounted_epr = discount_rewards(epr.numpy()) discounted_epr = torch.FloatTensor(discounted_epr) discounted_epr -= discounted_epr.mean() # Standardize the rewards discounted_epr /= discounted_epr.std() epdlogp *= discounted_epr # Modulate the gradient with advantage # Backpropagation optimizer.zero_grad() loss = -torch.sum(epdlogp * torch.log(aprob[1])) # Negative log likelihood loss.backward() optimizer.step() # Boring bookkeeping running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 print('Resetting env. Episode reward total was %f. Running mean: %f' % (reward_sum, running_reward)) if episode_number % 100 == 0: torch.save(model.state_dict(), 'save.p') reward_sum = 0 observation = env.reset() # Reset env prev_x = None if reward != 0: # Pong has either +1 or -1 reward exactly when game ends. print('Episode %d: Game finished, reward: %f' % (episode_number, reward) + ('' if reward == -1 else ' !!!!!!!!')) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,132 +0,0 @@ -
karpathy created this gist
May 30, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,132 @@ """ Trains an agent with (stochastic) Policy Gradients on Pong. Uses OpenAI Gym. """ import numpy as np import cPickle as pickle import gym # hyperparameters H = 200 # number of hidden layer neurons batch_size = 10 # every how many episodes to do a param update? learning_rate = 1e-4 gamma = 0.99 # discount factor for reward decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2 resume = False # resume from previous checkpoint? render = False # model initialization D = 80 * 80 # input dimensionality: 80x80 grid if resume: model = pickle.load(open('save.p', 'rb')) else: model = {} model['W1'] = np.random.randn(H,D) / np.sqrt(D) # "Xavier" initialization model['W2'] = np.random.randn(H) / np.sqrt(H) grad_buffer = { k : np.zeros_like(v) for k,v in model.iteritems() } # update buffers that add up gradients over a batch rmsprop_cache = { k : np.zeros_like(v) for k,v in model.iteritems() } # rmsprop memory def sigmoid(x): return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1] def prepro(I): """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """ I = I[35:195] # crop I = I[::2,::2,0] # downsample by factor of 2 I[I == 144] = 0 # erase background (background type 1) I[I == 109] = 0 # erase background (background type 2) I[I != 0] = 1 # everything else (paddles, ball) just set to 1 return I.astype(np.float).ravel() def discount_rewards(r): """ take 1D float array of rewards and compute discounted reward """ discounted_r = np.zeros_like(r) running_add = 0 for t in reversed(xrange(0, r.size)): if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!) running_add = running_add * gamma + r[t] discounted_r[t] = running_add return discounted_r def policy_forward(x): h = np.dot(model['W1'], x) h[h<0] = 0 # ReLU nonlinearity logp = np.dot(model['W2'], h) p = sigmoid(logp) return p, h # return probability of taking action 2, and hidden state def policy_backward(eph, epdlogp): """ backward pass. (eph is array of intermediate hidden states) """ dW2 = np.dot(eph.T, epdlogp).ravel() dh = np.outer(epdlogp, model['W2']) dh[eph <= 0] = 0 # backpro prelu dW1 = np.dot(dh.T, epx) return {'W1':dW1, 'W2':dW2} env = gym.make("Pong-v0") observation = env.reset() prev_x = None # used in computing the difference frame xs,hs,dlogps,drs = [],[],[],[] running_reward = None reward_sum = 0 episode_number = 0 while True: if render: env.render() # preprocess the observation, set input to network to be difference image cur_x = prepro(observation) x = cur_x - prev_x if prev_x is not None else np.zeros(D) prev_x = cur_x # forward the policy network and sample an action from the returned probability aprob, h = policy_forward(x) action = 2 if np.random.uniform() < aprob else 3 # roll the dice! # record various intermediates (needed later for backprop) xs.append(x) # observation hs.append(h) # hidden state y = 1 if action == 2 else 0 # a "fake label" dlogps.append(y - aprob) # grad that encourages the action that was taken to be taken (see http://cs231n.github.io/neural-networks-2/#losses if confused) # step the environment and get new measurements observation, reward, done, info = env.step(action) reward_sum += reward drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action) if done: # an episode finished episode_number += 1 # stack together all inputs, hidden states, action gradients, and rewards for this episode epx = np.vstack(xs) eph = np.vstack(hs) epdlogp = np.vstack(dlogps) epr = np.vstack(drs) xs,hs,dlogps,drs = [],[],[],[] # reset array memory # compute the discounted reward backwards through time discounted_epr = discount_rewards(epr) # standardize the rewards to be unit normal (helps control the gradient estimator variance) discounted_epr -= np.mean(discounted_epr) discounted_epr /= np.std(discounted_epr) epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.) grad = policy_backward(eph, epdlogp) for k in model: grad_buffer[k] += grad[k] # accumulate grad over batch # perform rmsprop parameter update every batch_size episodes if episode_number % batch_size == 0: for k,v in model.iteritems(): g = grad_buffer[k] # gradient rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2 model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer # boring book-keeping running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01 print 'resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward) if episode_number % 100 == 0: pickle.dump(model, open('save.p', 'wb')) reward_sum = 0 observation = env.reset() # reset env prev_x = None if reward != 0: # Pong has either +1 or -1 reward exactly when game ends. print ('ep %d: game finished, reward: %f' % (episode_number, reward)) + ('' if reward == -1 else ' !!!!!!!!')