# Value Iteration / Q Iteration

Background: 

1) Using the Bellman equation the value function can be defined as the maximum Q function over all actions
 $$
 V(s) = \max_{a \in A} Q(s,a)
 $$

2) Q function is defined recursively as the sum of the reward and the discounted value of the next state
 $$
 Q(s,a) = r(s,a) + \gamma \max_{a' \in A} Q(s',a')
 $$

3) **Value iteration**: an algorithm that iteratively updates the Q function until convergence. It's a programming algorithm that uses the Bellman equation to update the value function.
 General algorithm:
 - Initialize the value function $V(s)$ for all states to some value (usually 0)
 - Iterate over all states s and update the value function using the Bellman update
 $$
 V_s \leftarrow \max_a \sum_{s'} p_{a,s \rightarrow s'} (r_{s,a,s'} + \gamma V_{s'})
 $$
 - Repeat until convergence

4) **Q iteration**: this is essentially the same as value iteration but using the Q function instead of the value function in the update step.
 
 Plug in 
 $$
 V(s) = \max_{a \in A} Q(s,a)
 $$
 gives the update function
 $$
 V_s \leftarrow \max_a \sum_{s'} p_{a,s \rightarrow s'} (r_{s,a,s'} + \gamma\max_{a' \in A} Q(s',a'))
 $$


# Q-Learning

Value and Q iteration assume that the MDP is fully known (including the transition probabilities). We don't know these probabilities in the environment so we can't use these algorithms directly. But we can sample the environment and estimate the Q function from data.

General algo:
1) Start with an empty mapping of states to action values (Q functions)
2) Interact with the environment to collect (s, a, r, s') tuples (state, action, reward, next state). We use epsilon-greedy policy to select actions (for exploration/exploitation tradeoff)
3) Update the Q function using the Bellman approximation. Same update function as above BUT we blend the current estimate with the new estimate using a learning rate alpha.
 $$
 Q(s,a) \leftarrow (1 - \alpha) Q(s,a) + \alpha (r + \gamma \max_{a'} Q(s',a'))
 $$ 
4) Repeat until convergence

Notes:
- We need the epsilon-greedy policy to ensure that all states are visited (http://users.isr.ist.utl.pt/~mtjspaan/readingGroup/ProofQlearning.pdf)
- Can we implement optimistic & incremental Q learning? https://proceedings.neurips.cc/paper_files/paper/2001/file/6f2688a5fce7d48c8d19762b88c32c3b-Paper.pdf
- According to notes23.pdf alpha needs to trend to 0 for convergence. Not sure why, it is also mentioned that in practice a small constant alpha works well.

In [19]:
import gymnasium as gym
from random import random
import numpy as np
import typing as tt


State = int
Action = int
ValuesKey = tt.Tuple[State, Action]

# Value table for Q(s,a)
q_values: tt.Dict[ValuesKey, float] = dict()

GAMMA = 0.9
ALPHA = 0.2
EPSILON = 0.05


ENV = "FrozenLake-v1"

env = gym.make(ENV)

def select_best_action(state: State):
 best_action = None
 best_value = None
 for action in range(env.action_space.n):
 value = q_values[(state, action)]
 if best_value is None or value > best_value:
 best_value = value
 best_action = action
 return best_action

def play_episode(env: gym.Env):
 state, _ = env.reset()
 done = False
 while not done:
 if random() < EPSILON:
 action = env.action_space.sample()
 else:
 action = select_best_action(state)
 action = env.action_space.sample()
 new_state, reward, terminated, truncated, _ = env.step(action)
 yield state, action, reward, new_state
 state = new_state
 done = terminated or truncated

# s a r s'
def value_iteration(sample_tuples: tt.Iterable[tt.Tuple[State, Action, float, State]]):
 for state, action, reward, new_state in sample_tuples:
 q_values[(state, action)] = (1 - ALPHA) * q_values[(state, action)] + ALPHA * (reward + GAMMA * max(q_values[(new_state, a_prime)] for a_prime in range(env.action_space.n)))
 pass

eval_env = gym.make(ENV)
# Returns the fraction of episodes that were successful
def evaluate_policy(num_episodes = 10):
 def play_once():
 state, _ = eval_env.reset()
 total_reward = 0
 done = False
 while not done:
 new_state, reward, terminated, truncated, _ = eval_env.step(select_best_action(state))
 total_reward += reward
 state = new_state
 done = terminated or truncated
 return total_reward
 
 episode_rewards = [
 play_once() for _ in range(num_episodes)
 ]
 return len([r for r in episode_rewards if r > 0]) / len(episode_rewards)

def train():
 step = 0
 solve_rate = 0

 # Initialize the Q-values to 0
 for state in range(env.observation_space.n):
 for action in range(env.action_space.n):
 q_values[(state, action)] = 0

 while solve_rate < 0.8:
 samples = list(play_episode(env))
 value_iteration(samples)
 new_solve_rate = evaluate_policy(100)
 if new_solve_rate > solve_rate:
 print(f"Step {step}, solve rate {solve_rate} -> {new_solve_rate}")
 solve_rate = new_solve_rate
 step += 1
 
 print("Solved in", step, "steps")
 print("Playing final policy", evaluate_policy(1000))

train()

env.close()

Step 21, solve rate 0 -> 0.01
Step 22, solve rate 0.01 -> 0.04
Step 33, solve rate 0.04 -> 0.05
Step 40, solve rate 0.05 -> 0.06
Step 51, solve rate 0.06 -> 0.17
Step 145, solve rate 0.17 -> 0.2
Step 147, solve rate 0.2 -> 0.26
Step 268, solve rate 0.26 -> 0.28
Step 328, solve rate 0.28 -> 0.34
Step 509, solve rate 0.34 -> 0.36
Step 514, solve rate 0.36 -> 0.41
Step 1169, solve rate 0.41 -> 0.47
Step 1557, solve rate 0.47 -> 0.56
Step 1558, solve rate 0.56 -> 0.58
Step 1562, solve rate 0.58 -> 0.63
Step 1626, solve rate 0.63 -> 0.65
Step 1627, solve rate 0.65 -> 0.68
Step 1828, solve rate 0.68 -> 0.71
Step 1830, solve rate 0.71 -> 0.74
Step 1843, solve rate 0.74 -> 0.76
Step 2299, solve rate 0.76 -> 0.81
Solved in 2300 steps
Playing final policy 0.707


In [23]:
from gymnasium.utils.save_video import save_video

def save_vid():
 env_video = gym.make("FrozenLake-v1", render_mode="rgb_array_list")

 solved = False
 while not solved:
 state, _ = env_video.reset()
 done = False
 while not done:
 new_state, reward, terminated, truncated, _ = env_video.step(select_best_action(state))
 state = new_state
 done = terminated or truncated
 solved = reward > 0
 
 print(save_video(
 env_video.render(),
 '.',
 fps=10
 ))
save_vid()

# Show 'rl-video-episode-0.mp4' inside the notebook (encoded as base64)
from base64 import b64encode
bytes = open("rl-video-episode-0.mp4", "rb").read()
base64 = b64encode(bytes).decode()

from IPython.display import HTML
HTML(f'')

None
