In [1]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

<pyvirtualdisplay.display.Display at 0x7f9c60386f10>

In [2]:
import torch.nn as nn

class Net(nn.Module):
    def __init__(self, obs_size, actions_size, hidden_layers):
        super(Net, self).__init__()
        
        self.model = nn.Sequential(
            nn.Linear(obs_size, hidden_layers),
            nn.ReLU(),
            nn.Linear(hidden_layers, actions_size)
        )

    def forward(self, x):
        return self.model(x)


In [29]:
import gymnasium
import gymnasium as gym
from random import random

import torch
import torch.nn as nn
import torch.optim as optim

import numpy as np

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

env = gym.make("CartPole-v1")
observation, info = env.reset()

EPSILON = 0.05
EPISODE_LEN = 20
def generate_episodes(predict):
    while True:
        i = 0
        truncated = False
        terminated = False
        
        episode = list()
        episode_reward = 0
        
        observation, info = env.reset()
        while i < EPISODE_LEN and not truncated and not terminated:
            if random() <= EPSILON:
                next_action = env.action_space.sample()
            else:
                next_action = predict(observation)
            observation, reward, terminated, truncated, info = env.step(next_action)
            episode.append((observation, next_action))
            episode_reward += reward
            i += 1
            
        yield (episode, episode_reward)

dim_one_softmax = nn.Softmax(dim=1)
def sample_model_actions_distribution(model, observation):
    observation_tensor = torch.tensor(observation, dtype=torch.float32).to(DEVICE)
    observation_minibatch = observation_tensor.unsqueeze(0)
    action_probability_distribution = dim_one_softmax(model(observation_minibatch)).to('cpu').data.numpy()[0]
    action_sampled = np.random.choice(len(action_probability_distribution), p=action_probability_distribution)
    return action_sampled

    
BATCH_LEN = 100
HIDDEN_SIZE = 128
LEARNING_RATE = 0.01
TAKE_TOP_P = 0.3 # Best 20% of episodes used for training
def train_model():
    obs_size = env.observation_space.shape[0]
    n_actions = int(env.action_space.n)
    model = Net(
        obs_size=obs_size,
        actions_size=n_actions,
        hidden_layers=HIDDEN_SIZE
    ).to(DEVICE)

    objective = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=model.parameters(), lr=LEARNING_RATE)
    
    # Train model
    iteration = 0
    median_reward = 0
    while median_reward < 475:
        # Play with current model
        episodes_generator = generate_episodes(lambda observation: sample_model_actions_distribution(model, observation))
        episodes = [next(episodes_generator) for _ in range(0, BATCH_LEN)]
        median_reward = np.median([x[1] for x in episodes])
        best_reward = np.max([x[1] for x in episodes])
        worst_reward = np.min([x[1] for x in episodes])
        print(f"Played {BATCH_LEN} episodes. Median reward: {median_reward}, Best reward: {best_reward}, Worst reward: {worst_reward}")

        # Pick best p episodes
        episodes_sorted = sorted(episodes, key=lambda x: x[1], reverse=True)
        episodes_top_p = episodes_sorted[0:int(TAKE_TOP_P * BATCH_LEN)]
        print(f"Selected {len(episodes_top_p)} best episodes")
        print(f"Rewards: {', '.join([str(x[1]) for x in episodes_top_p])}")

        # Train the model on the best (obs, action) pairs. Episodes is a list of ((obs, action), total_reward) pairs
        pairs = [x[0] for x in episodes_top_p]
        flat_pairs = [item for sublist in pairs for item in sublist]
        minibatch_observations = torch.tensor([pair[0] for pair in flat_pairs], dtype=torch.float32).to(DEVICE)
        minibatch_actions = torch.tensor([pair[1] for pair in flat_pairs], dtype=torch.long).to(DEVICE)

        optimizer.zero_grad()
        predicted_actions = model(minibatch_observations)
        loss = objective(predicted_actions, minibatch_actions) # CrossEntropyLoss -> difference between predicted and actual actions
        loss.backward()
        optimizer.step()
        print(f"Iteration {iteration} - loss: ", loss.item())
        iteration += 1

    
train_model()

env.close()

Played 100 episodes. Median reward: 20.0, Best reward: 20.0, Worst reward: 10.0
Selected 30 best episodes
Rewards: 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0
Iteration 0 - loss:  0.7031133770942688
Played 100 episodes. Median reward: 15.5, Best reward: 20.0, Worst reward: 9.0
Selected 30 best episodes
Rewards: 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 19.0, 19.0
Iteration 1 - loss:  0.654332160949707
Played 100 episodes. Median reward: 14.0, Best reward: 20.0, Worst reward: 8.0
Selected 30 best episodes
Rewards: 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 19.0, 19.0, 19.0, 18.0, 18.0, 18.0, 18.0, 18.0, 18.0, 17.0, 17.0, 16.0
Iteration 2 - loss:  0.5910254120826721
P

KeyboardInterrupt: 

1