Skip to content

Instantly share code, notes, and snippets.

@CPPAlien
Forked from karpathy/pg-pong.py
Last active September 22, 2024 23:18
Show Gist options
  • Save CPPAlien/91388eb16a85e80ec55689069bda0c25 to your computer and use it in GitHub Desktop.
Save CPPAlien/91388eb16a85e80ec55689069bda0c25 to your computer and use it in GitHub Desktop.

Revisions

  1. CPPAlien revised this gist Sep 22, 2024. 2 changed files with 124 additions and 132 deletions.
    124 changes: 124 additions & 0 deletions pg-pong-pytorch.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,124 @@
    import torch
    import torch.nn as nn
    import torch.optim as optim
    import gym
    import numpy as np
    import pickle

    # Hyperparameters
    H = 200 # Number of hidden layer neurons
    batch_size = 10 # Every how many episodes to do a param update?
    learning_rate = 1e-4
    gamma = 0.99 # Discount factor for reward
    decay_rate = 0.99 # Decay factor for RMSProp leaky sum of grad^2
    resume = False # Resume from previous checkpoint?
    render = False

    # Model initialization
    D = 80 * 80 # Input dimensionality: 80x80 grid
    model = nn.Sequential(
    nn.Linear(D, H),
    nn.ReLU(),
    nn.Linear(H, 2) # Output layer for two actions
    )

    optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, alpha=decay_rate)

    if resume:
    model.load_state_dict(torch.load('save.p'))

    def prepro(I):
    """ Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
    I = np.array(I[0]) if isinstance(I, tuple) else np.array(I) # Ensure I is a NumPy array
    I = I[35:195]
    if I.ndim == 3: # Check if I has 3 dimensions
    I = I[::2, ::2, 0] # Downsample by factor of 2
    else: # Handle the case where I is 2-dimensional
    I = I[::2, ::2] # Downsample accordingly
    I[I == 144] = 0 # Erase background (background type 1)
    I[I == 109] = 0 # Erase background (background type 2)
    I[I != 0] = 1 # Everything else (paddles, ball) just set to 1
    return torch.FloatTensor(I).view(-1) # Convert to PyTorch tensor and flatten

    def discount_rewards(r):
    """ Take 1D float array of rewards and compute discounted reward """
    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(range(r.size)):
    if r[t] != 0: running_add = 0 # Reset the sum, since this was a game boundary
    running_add = running_add * gamma + r[t]
    discounted_r[t] = running_add
    return discounted_r

    def policy_forward(x):
    h = model(x)
    p = torch.softmax(h, dim=0) # Apply softmax to get action probabilities
    return p

    env = gym.make("PongNoFrameskip-v4")
    observation = env.reset()
    prev_x = None # Used in computing the difference frame
    xs, hs, dlogps, drs = [], [], [], []
    running_reward = None
    reward_sum = 0
    episode_number = 0

    while True:
    if render: env.render()

    # Preprocess the observation, set input to network to be difference image
    cur_x = prepro(observation)
    x = cur_x - prev_x if prev_x is not None else torch.zeros(D)
    prev_x = cur_x

    # Forward the policy network and sample an action from the returned probability
    aprob = policy_forward(x)
    action = 2 if torch.rand(1).item() < aprob[1] else 3 # Roll the dice!

    # Record various intermediates (needed later for backprop)
    xs.append(x) # Observation
    hs.append(aprob) # Hidden state
    y = 1 if action == 2 else 0 # A "fake label"
    dlogps.append(y - aprob[1]) # Grad that encourages the action that was taken

    # Step the environment and get new measurements
    result = env.step(action) # Capture all returned values
    observation, reward, done, info = result[:4] # Unpack the first four values
    reward_sum += reward

    drs.append(reward) # Record reward

    if done: # An episode finished
    episode_number += 1

    # Stack together all inputs, hidden states, action gradients, and rewards for this episode
    epx = torch.stack(xs)
    eph = torch.stack(hs)
    epdlogp = torch.stack(dlogps)
    epr = torch.FloatTensor(drs)
    xs, hs, dlogps, drs = [], [], [], [] # Reset array memory

    # Compute the discounted reward backwards through time
    discounted_epr = discount_rewards(epr.numpy())
    discounted_epr = torch.FloatTensor(discounted_epr)
    discounted_epr -= discounted_epr.mean() # Standardize the rewards
    discounted_epr /= discounted_epr.std()

    epdlogp *= discounted_epr # Modulate the gradient with advantage

    # Backpropagation
    optimizer.zero_grad()
    loss = -torch.sum(epdlogp * torch.log(aprob[1])) # Negative log likelihood
    loss.backward()
    optimizer.step()

    # Boring bookkeeping
    running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
    print('Resetting env. Episode reward total was %f. Running mean: %f' % (reward_sum, running_reward))
    if episode_number % 100 == 0: torch.save(model.state_dict(), 'save.p')
    reward_sum = 0
    observation = env.reset() # Reset env
    prev_x = None

    if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
    print('Episode %d: Game finished, reward: %f' % (episode_number, reward) + ('' if reward == -1 else ' !!!!!!!!'))
    132 changes: 0 additions & 132 deletions pg-pong.py
    Original file line number Diff line number Diff line change
    @@ -1,132 +0,0 @@
    """ Trains an agent with (stochastic) Policy Gradients on Pong. Uses OpenAI Gym. """
    import numpy as np
    import cPickle as pickle
    import gym

    # hyperparameters
    H = 200 # number of hidden layer neurons
    batch_size = 10 # every how many episodes to do a param update?
    learning_rate = 1e-4
    gamma = 0.99 # discount factor for reward
    decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2
    resume = False # resume from previous checkpoint?
    render = False

    # model initialization
    D = 80 * 80 # input dimensionality: 80x80 grid
    if resume:
    model = pickle.load(open('save.p', 'rb'))
    else:
    model = {}
    model['W1'] = np.random.randn(H,D) / np.sqrt(D) # "Xavier" initialization
    model['W2'] = np.random.randn(H) / np.sqrt(H)

    grad_buffer = { k : np.zeros_like(v) for k,v in model.iteritems() } # update buffers that add up gradients over a batch
    rmsprop_cache = { k : np.zeros_like(v) for k,v in model.iteritems() } # rmsprop memory

    def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1]

    def prepro(I):
    """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
    I = I[35:195] # crop
    I = I[::2,::2,0] # downsample by factor of 2
    I[I == 144] = 0 # erase background (background type 1)
    I[I == 109] = 0 # erase background (background type 2)
    I[I != 0] = 1 # everything else (paddles, ball) just set to 1
    return I.astype(np.float).ravel()

    def discount_rewards(r):
    """ take 1D float array of rewards and compute discounted reward """
    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(xrange(0, r.size)):
    if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
    running_add = running_add * gamma + r[t]
    discounted_r[t] = running_add
    return discounted_r

    def policy_forward(x):
    h = np.dot(model['W1'], x)
    h[h<0] = 0 # ReLU nonlinearity
    logp = np.dot(model['W2'], h)
    p = sigmoid(logp)
    return p, h # return probability of taking action 2, and hidden state

    def policy_backward(eph, epdlogp):
    """ backward pass. (eph is array of intermediate hidden states) """
    dW2 = np.dot(eph.T, epdlogp).ravel()
    dh = np.outer(epdlogp, model['W2'])
    dh[eph <= 0] = 0 # backpro prelu
    dW1 = np.dot(dh.T, epx)
    return {'W1':dW1, 'W2':dW2}

    env = gym.make("Pong-v0")
    observation = env.reset()
    prev_x = None # used in computing the difference frame
    xs,hs,dlogps,drs = [],[],[],[]
    running_reward = None
    reward_sum = 0
    episode_number = 0
    while True:
    if render: env.render()

    # preprocess the observation, set input to network to be difference image
    cur_x = prepro(observation)
    x = cur_x - prev_x if prev_x is not None else np.zeros(D)
    prev_x = cur_x

    # forward the policy network and sample an action from the returned probability
    aprob, h = policy_forward(x)
    action = 2 if np.random.uniform() < aprob else 3 # roll the dice!

    # record various intermediates (needed later for backprop)
    xs.append(x) # observation
    hs.append(h) # hidden state
    y = 1 if action == 2 else 0 # a "fake label"
    dlogps.append(y - aprob) # grad that encourages the action that was taken to be taken (see http://cs231n.github.io/neural-networks-2/#losses if confused)

    # step the environment and get new measurements
    observation, reward, done, info = env.step(action)
    reward_sum += reward

    drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action)

    if done: # an episode finished
    episode_number += 1

    # stack together all inputs, hidden states, action gradients, and rewards for this episode
    epx = np.vstack(xs)
    eph = np.vstack(hs)
    epdlogp = np.vstack(dlogps)
    epr = np.vstack(drs)
    xs,hs,dlogps,drs = [],[],[],[] # reset array memory

    # compute the discounted reward backwards through time
    discounted_epr = discount_rewards(epr)
    # standardize the rewards to be unit normal (helps control the gradient estimator variance)
    discounted_epr -= np.mean(discounted_epr)
    discounted_epr /= np.std(discounted_epr)

    epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.)
    grad = policy_backward(eph, epdlogp)
    for k in model: grad_buffer[k] += grad[k] # accumulate grad over batch

    # perform rmsprop parameter update every batch_size episodes
    if episode_number % batch_size == 0:
    for k,v in model.iteritems():
    g = grad_buffer[k] # gradient
    rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2
    model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
    grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer

    # boring book-keeping
    running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
    print 'resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward)
    if episode_number % 100 == 0: pickle.dump(model, open('save.p', 'wb'))
    reward_sum = 0
    observation = env.reset() # reset env
    prev_x = None

    if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
    print ('ep %d: game finished, reward: %f' % (episode_number, reward)) + ('' if reward == -1 else ' !!!!!!!!')
  2. @karpathy karpathy created this gist May 30, 2016.
    132 changes: 132 additions & 0 deletions pg-pong.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,132 @@
    """ Trains an agent with (stochastic) Policy Gradients on Pong. Uses OpenAI Gym. """
    import numpy as np
    import cPickle as pickle
    import gym

    # hyperparameters
    H = 200 # number of hidden layer neurons
    batch_size = 10 # every how many episodes to do a param update?
    learning_rate = 1e-4
    gamma = 0.99 # discount factor for reward
    decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2
    resume = False # resume from previous checkpoint?
    render = False

    # model initialization
    D = 80 * 80 # input dimensionality: 80x80 grid
    if resume:
    model = pickle.load(open('save.p', 'rb'))
    else:
    model = {}
    model['W1'] = np.random.randn(H,D) / np.sqrt(D) # "Xavier" initialization
    model['W2'] = np.random.randn(H) / np.sqrt(H)

    grad_buffer = { k : np.zeros_like(v) for k,v in model.iteritems() } # update buffers that add up gradients over a batch
    rmsprop_cache = { k : np.zeros_like(v) for k,v in model.iteritems() } # rmsprop memory

    def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1]

    def prepro(I):
    """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
    I = I[35:195] # crop
    I = I[::2,::2,0] # downsample by factor of 2
    I[I == 144] = 0 # erase background (background type 1)
    I[I == 109] = 0 # erase background (background type 2)
    I[I != 0] = 1 # everything else (paddles, ball) just set to 1
    return I.astype(np.float).ravel()

    def discount_rewards(r):
    """ take 1D float array of rewards and compute discounted reward """
    discounted_r = np.zeros_like(r)
    running_add = 0
    for t in reversed(xrange(0, r.size)):
    if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
    running_add = running_add * gamma + r[t]
    discounted_r[t] = running_add
    return discounted_r

    def policy_forward(x):
    h = np.dot(model['W1'], x)
    h[h<0] = 0 # ReLU nonlinearity
    logp = np.dot(model['W2'], h)
    p = sigmoid(logp)
    return p, h # return probability of taking action 2, and hidden state

    def policy_backward(eph, epdlogp):
    """ backward pass. (eph is array of intermediate hidden states) """
    dW2 = np.dot(eph.T, epdlogp).ravel()
    dh = np.outer(epdlogp, model['W2'])
    dh[eph <= 0] = 0 # backpro prelu
    dW1 = np.dot(dh.T, epx)
    return {'W1':dW1, 'W2':dW2}

    env = gym.make("Pong-v0")
    observation = env.reset()
    prev_x = None # used in computing the difference frame
    xs,hs,dlogps,drs = [],[],[],[]
    running_reward = None
    reward_sum = 0
    episode_number = 0
    while True:
    if render: env.render()

    # preprocess the observation, set input to network to be difference image
    cur_x = prepro(observation)
    x = cur_x - prev_x if prev_x is not None else np.zeros(D)
    prev_x = cur_x

    # forward the policy network and sample an action from the returned probability
    aprob, h = policy_forward(x)
    action = 2 if np.random.uniform() < aprob else 3 # roll the dice!

    # record various intermediates (needed later for backprop)
    xs.append(x) # observation
    hs.append(h) # hidden state
    y = 1 if action == 2 else 0 # a "fake label"
    dlogps.append(y - aprob) # grad that encourages the action that was taken to be taken (see http://cs231n.github.io/neural-networks-2/#losses if confused)

    # step the environment and get new measurements
    observation, reward, done, info = env.step(action)
    reward_sum += reward

    drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action)

    if done: # an episode finished
    episode_number += 1

    # stack together all inputs, hidden states, action gradients, and rewards for this episode
    epx = np.vstack(xs)
    eph = np.vstack(hs)
    epdlogp = np.vstack(dlogps)
    epr = np.vstack(drs)
    xs,hs,dlogps,drs = [],[],[],[] # reset array memory

    # compute the discounted reward backwards through time
    discounted_epr = discount_rewards(epr)
    # standardize the rewards to be unit normal (helps control the gradient estimator variance)
    discounted_epr -= np.mean(discounted_epr)
    discounted_epr /= np.std(discounted_epr)

    epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.)
    grad = policy_backward(eph, epdlogp)
    for k in model: grad_buffer[k] += grad[k] # accumulate grad over batch

    # perform rmsprop parameter update every batch_size episodes
    if episode_number % batch_size == 0:
    for k,v in model.iteritems():
    g = grad_buffer[k] # gradient
    rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2
    model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
    grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer

    # boring book-keeping
    running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
    print 'resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward)
    if episode_number % 100 == 0: pickle.dump(model, open('save.p', 'wb'))
    reward_sum = 0
    observation = env.reset() # reset env
    prev_x = None

    if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
    print ('ep %d: game finished, reward: %f' % (episode_number, reward)) + ('' if reward == -1 else ' !!!!!!!!')