Mini Project: Dynamic Programming

In this notebook, you will write your own implementations of many classical dynamic programming algorithms.

While we have provided some starter code, you are welcome to erase these hints and write your code from scratch.

Part 0: Explore FrozenLakeEnv

Use the code cell below to create an instance of the FrozenLake environment.

In [1]:
from frozenlake import FrozenLakeEnv

env = FrozenLakeEnv()

The agent moves through a $4 \times 4$ gridworld, with states numbered as follows:

[[ 0  1  2  3]
 [ 4  5  6  7]
 [ 8  9 10 11]
 [12 13 14 15]]

and the agent has 4 potential actions:

LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

Thus, $\mathcal{S}^+ = \{0, 1, \ldots, 15\}$, and $\mathcal{A} = \{0, 1, 2, 3\}$. Verify this by running the code cell below.

In [2]:
# print the state space and action space
print(env.observation_space)
print(env.action_space)

# print the total number of states and actions
print(env.nS)
print(env.nA)
Discrete(16)
Discrete(4)
16
4

Dynamic programming assumes that the agent has full knowledge of the MDP. We have already amended the frozenlake.py file to make the one-step dynamics accessible to the agent.

Execute the code cell below to return the one-step dynamics corresponding to a particular state and action. In particular, env.P[1][0] returns the the probability of each possible reward and next state, if the agent is in state 1 of the gridworld and decides to go left.

In [3]:
# the frozen lake is slippery, so every intended action has a variety of outcomes:
# here the intention is to move down (1), but the outcomes are that you stay put, 
# move left or move right, which are all equal likely.
env.P[14][1]
Out[3]:
[(0.3333333333333333, 13, 0.0, False),
 (0.3333333333333333, 14, 0.0, False),
 (0.3333333333333333, 15, 1.0, True)]
In [4]:
# so for every state you have all moving intentions as possibilities, 
# which produce different outcomes
env.P[14]
Out[4]:
{0: [(0.3333333333333333, 10, 0.0, False),
  (0.3333333333333333, 13, 0.0, False),
  (0.3333333333333333, 14, 0.0, False)],
 1: [(0.3333333333333333, 13, 0.0, False),
  (0.3333333333333333, 14, 0.0, False),
  (0.3333333333333333, 15, 1.0, True)],
 2: [(0.3333333333333333, 14, 0.0, False),
  (0.3333333333333333, 15, 1.0, True),
  (0.3333333333333333, 10, 0.0, False)],
 3: [(0.3333333333333333, 15, 1.0, True),
  (0.3333333333333333, 10, 0.0, False),
  (0.3333333333333333, 13, 0.0, False)]}
In [5]:
# here you can see that in every state all actions as moving intentions are possible
[len(env.P[s]) for s in range(env.nS)]
Out[5]:
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]

Each entry takes the form

prob, next_state, reward, done

where:

  • prob details the conditional probability of the corresponding (next_state, reward) pair, and
  • done is True if the next_state is a terminal state, and otherwise False.

Thus, we can interpret env.P[1][0] as follows: $$ \mathbb{P}(S_{t+1}=s',R_{t+1}=r|S_t=1,A_t=0) = \begin{cases} \frac{1}{3} \text{ if } s'=1, r=0\\ \frac{1}{3} \text{ if } s'=0, r=0\\ \frac{1}{3} \text{ if } s'=5, r=0\\ 0 \text{ else} \end{cases} $$

Feel free to change the code cell above to explore how the environment behaves in response to other (state, action) pairs.

Part 1: Iterative Policy Evaluation

In this section, you will write your own implementation of iterative policy evaluation.

Your algorithm should accept four arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
  • theta: This is a very small positive number that is used to decide if the estimate has sufficiently converged to the true value function (default value: 1e-8).

The algorithm returns as output:

  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s under the input policy.

Please complete the function in the code cell below.

In [6]:
import numpy as np

def policy_evaluation(env, policy, gamma=1, theta=1e-8):
    """if the policy is known the state value function can be calculated from it
    in an iterative way"""
    
    # the state value function is initialized with a zero guess
    V = np.zeros(env.nS)
    
    # now the iteration begins
    while True:
        # initialize the difference to the next value function
        delta = 0
        
        # for every state the the state value is calculated: 
        # in every calculation the latest values of the state function are 
        # used for other states
        for s in range(env.nS):
            
            # initialize the next calculation of V[s] 
            Vs_new = 0
            
            # now the policy is a stochastic policy with actions that are 
            # executed with a probability
            for a_intended, p_a_intented in enumerate(policy[s]):
                
                # for intended action there is a list of possible outcomes 
                # the outcomes are tuples of probability, state, reward (and an indicator
                # whether the state is a final state)
                for p_outcome, s_outcome, r_outcome, _ in env.P[s][a_intended]:
                    
                    # the statefunction sums up that outcomes weigthed by their probabilities
                    Vs_new += p_a_intented * p_outcome * (r_outcome + gamma * V[s_outcome])
        
            # get the stop criteria for the iteration which is the maximal 
            # difference between the new and the old value of the state function in this
            # iteration
            delta = max(delta, abs(V[s] - Vs_new))

            # the newly calculated value is immediately replaced in the value state function
            # and used to calculate the next states
            V[s] = Vs_new
        
        # test the stop criteria for the iteration after every complete run through all states
        if delta < theta:
            break
    
    # when the iteration has ended the state value function is returned
    return V

We will evaluate the equiprobable random policy $\pi$, where $\pi(a|s) = \frac{1}{|\mathcal{A}(s)|}$ for all $s\in\mathcal{S}$ and $a\in\mathcal{A}(s)$.

Use the code cell below to specify this policy in the variable random_policy.

In [7]:
random_policy = np.ones([env.nS, env.nA]) / env.nA

Run the next code cell to evaluate the equiprobable random policy and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

In [8]:
from plot_utils import plot_values

# evaluate the policy 
V = policy_evaluation(env, random_policy)

plot_values(V)
<matplotlib.figure.Figure at 0x7fb2921c7828>

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that your policy_evaluation function satisfies the requirements outlined above (with four inputs, a single output, and with the default values of the input arguments unchanged).

In [9]:
import check_test

check_test.run_check('policy_evaluation_check', policy_evaluation)

PASSED

Part 2: Obtain $q_\pi$ from $v_\pi$

In this section, you will write a function that takes the state-value function estimate as input, along with some state $s\in\mathcal{S}$. It returns the row in the action-value function corresponding to the input state $s\in\mathcal{S}$. That is, your function should accept as input both $v_\pi$ and $s$, and return $q_\pi(s,a)$ for all $a\in\mathcal{A}(s)$.

Your algorithm should accept four arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
  • s: This is an integer corresponding to a state in the environment. It should be a value between 0 and (env.nS)-1, inclusive.
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

  • q: This is a 1D numpy array with q.shape[0] equal to the number of actions (env.nA). q[a] contains the (estimated) value of state s and action a.

Please complete the function in the code cell below.

In [10]:
def q_from_v(env, V, s, gamma=1):
    """from the state function v the state action function q can be calculated for any state:
    q(s, a) is the value that you get by performing action a in state s and then afterwards 
    following the policy for all subsequent steps.
    
    the function computes q(s , a) for one s and all a's
    """
    # initialize the q values
    q = np.zeros(env.nA)
         
    # for all possible actions in state s, which is actually all actions in env.nA as we saw 
    # earlier
    for a_intended in env.P[s].keys():

        # for all possible outcomes if action a_intended is taken
        for p_outcome, s_outcome, r_outcome, _ in env.P[s][a_intended]:

            # the new value is calculated
            q[a_intended] += p_outcome * (r_outcome + gamma * V[s_outcome])
    
    return q

Run the code cell below to print the action-value function corresponding to the above state-value function.

In [11]:
Q = np.zeros([env.nS, env.nA])
for s in range(env.nS):
    Q[s] = q_from_v(env, V, s)
print("Action-Value Function:")
print(Q)
Action-Value Function:
[[ 0.0147094   0.01393978  0.01393978  0.01317015]
 [ 0.00852356  0.01163091  0.0108613   0.01550788]
 [ 0.02444514  0.02095298  0.02406033  0.01435346]
 [ 0.01047649  0.01047649  0.00698432  0.01396865]
 [ 0.02166487  0.01701828  0.01624865  0.01006281]
 [ 0.          0.          0.          0.        ]
 [ 0.05433538  0.04735105  0.05433538  0.00698432]
 [ 0.          0.          0.          0.        ]
 [ 0.01701828  0.04099204  0.03480619  0.04640826]
 [ 0.07020885  0.11755991  0.10595784  0.05895312]
 [ 0.18940421  0.17582037  0.16001424  0.04297382]
 [ 0.          0.          0.          0.        ]
 [ 0.          0.          0.          0.        ]
 [ 0.08799677  0.20503718  0.23442716  0.17582037]
 [ 0.25238823  0.53837051  0.52711478  0.43929118]
 [ 0.          0.          0.          0.        ]]

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the q_from_v function satisfies the requirements outlined above (with four inputs, a single output, and with the default values of the input arguments unchanged).

In [12]:
check_test.run_check('q_from_v_check', q_from_v)

PASSED

Part 3: Policy Improvement

In this section, you will write your own implementation of policy improvement.

Your algorithm should accept three arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.

Please complete the function in the code cell below. You are encouraged to use the q_from_v function you implemented above.

In [13]:
def policy_improvement(env, V, gamma=1):
    """this function improves a policy by building on a state function V:
    in every state the improved policy takes the action with the highest state action function
    value
    """
    # initialize the new policy
    policy = np.zeros([env.nS, env.nA])
    
    # for all states s, define the next action
    for s in range(env.nS):
        
        # get the state action values
        q_s = q_from_v(env, V, s, gamma=1)
        
        # deterministic policy: chose the action with the highest state action value
        policy[s][np.argmax(q_s)] = 1
        
        # stochastic policy:
        # we could also choose a random combination of the actions with the highest
        # state action values, if several actions have the same value
    
    return policy

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the policy_improvement function satisfies the requirements outlined above (with three inputs, a single output, and with the default values of the input arguments unchanged).

Before moving on to the next part of the notebook, you are strongly encouraged to check out the solution in Dynamic_Programming_Solution.ipynb. There are many correct ways to approach this function!

In [14]:
check_test.run_check('policy_improvement_check', policy_improvement)

PASSED

Part 4: Policy Iteration

In this section, you will write your own implementation of policy iteration. The algorithm returns the optimal policy, along with its corresponding state-value function.

Your algorithm should accept three arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
  • theta: This is a very small positive number that is used to decide if the policy evaluation step has sufficiently converged to the true value function (default value: 1e-8).

The algorithm returns as output:

  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below. You are strongly encouraged to use the policy_evaluation and policy_improvement functions you implemented above.

In [15]:
import copy

def policy_iteration(env, gamma=1, theta=1e-8):
    """this function starts with a trivial guess for a policy
    then it iterates over policy evaluation and policy improvement:
    - from the policy get the state function V approximately through iteration 
    (already implemented above)
    - with the state function given, we can then improve the policy (implemented above)
    - we iterate these steps until the policy becomes stable (difference between the policy
    and its next iteration smaller then theta)
    """
    
    # initialize the policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    
    while True:
        # get state value function for policy
        V = policy_evaluation(env, policy, gamma, theta)
        
        # use the state function to improve the policy
        next_policy = policy_improvement(env, V, gamma)
        
        # test whether policy is stable
        delta = policy - next_policy
        
        # the calculated policy will be the next policy in the iteration
        # or the policy that gets to be returned
        policy = next_policy.copy()
        
        # break the iteration if the stop criteria is reached
        if delta.all() < theta:
            break
    
    # policy and state function are returned
    return policy, V

Run the next code cell to solve the MDP and visualize the output. The optimal state-value function has been reshaped to match the shape of the gridworld.

Compare the optimal state-value function to the state-value function from Part 1 of this notebook. Is the optimal state-value function consistently greater than or equal to the state-value function for the equiprobable random policy?

In [16]:
# obtain the optimal policy and optimal state-value function
policy_pi, V_pi = policy_iteration(env)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_pi,"\n")

plot_values(V_pi)
Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]] 

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the policy_iteration function satisfies the requirements outlined above (with three inputs, two outputs, and with the default values of the input arguments unchanged).

In [17]:
check_test.run_check('policy_iteration_check', policy_iteration)

PASSED

Part 5: Truncated Policy Iteration

In this section, you will write your own implementation of truncated policy iteration.

You will begin by implementing truncated policy evaluation. Your algorithm should accept five arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
  • max_it: This is a positive integer that corresponds to the number of sweeps through the state space (default value: 1).
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).

The algorithm returns as output:

  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below.

In [18]:
def truncated_policy_evaluation(env, policy, V, max_it=1, gamma=1):
    """truncated policy iteration gets a policy and state function that is an initial guess
    then it iterates a given number of times to get the approximate state function 
    for the policy
    
    this time we do not iterate until the state function becomes stable, but just a fixed
    number of times: because of this the method is called 'truncated'
    """
        
    # iterate a fixed number of times
    for i in range(max_it):
        
        # get the state function for the policy
        
        # for all states
        for s in range(env.nS):
            
            # calculate the next value for s and use all already calculated state values
            v_s = 0
            
            # get the state action values for that state
            q_s = q_from_v(env, V, s, gamma)
            
            # for all actions in the policy
            for a, p_action in enumerate(policy[s]):
                             
                # the new state value is calculated as the sum of 
                # the products probability for an action * action state value for that action
                v_s += p_action * q_s[a]

            # the state value is replaced in the state function for immediate usage of 
            # calculation the next states
            V[s] = v_s
    
    # return the state value function
    return V

Next, you will implement truncated policy iteration. Your algorithm should accept five arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • max_it: This is a positive integer that corresponds to the number of sweeps through the state space (default value: 1).
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
  • theta: This is a very small positive number that is used for the stopping criterion (default value: 1e-8).

The algorithm returns as output:

  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.

Please complete the function in the code cell below.

In [19]:
def truncated_policy_iteration(env, max_it=1, gamma=1, theta=1e-8):
    """truncated policy iteration iterates as before from a trivial policy via getting the
    state function with the truncated method to improving the policy
    
    the policy is improved until it becomes stable"""
    V = np.zeros(env.nS)
    policy = np.zeros([env.nS, env.nA])
    
    while True:
        
        # get policy improvement 
        policy = policy_improvement(env, V, gamma)
        
        # copy V for comparision
        V_last = V.copy()
               
        # get state value function for policy
        V = truncated_policy_evaluation(env, policy, V, max_it, gamma)
        
        # get difference between value state functions
        delta = max(abs(V - V_last))
        
        if delta < theta:
            break
    
    return policy, V

Run the next code cell to solve the MDP and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

Play with the value of the max_it argument. Do you always end with the optimal state-value function?

In [20]:
policy_tpi, V_tpi = truncated_policy_iteration(env, max_it=2)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_tpi,"\n")

# plot the optimal state-value function
plot_values(V_tpi)
Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]] 

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the truncated_policy_iteration function satisfies the requirements outlined above (with four inputs, two outputs, and with the default values of the input arguments unchanged).

In [21]:
check_test.run_check('truncated_policy_iteration_check', truncated_policy_iteration)

PASSED

Part 6: Value Iteration

In this section, you will write your own implementation of value iteration.

Your algorithm should accept three arguments as input:

  • env: This is an instance of an OpenAI Gym environment, where env.P returns the one-step dynamics.
  • gamma: This is the discount rate. It must be a value between 0 and 1, inclusive (default value: 1).
  • theta: This is a very small positive number that is used for the stopping criterion (default value: 1e-8).

The algorithm returns as output:

  • policy: This is a 2D numpy array with policy.shape[0] equal to the number of states (env.nS), and policy.shape[1] equal to the number of actions (env.nA). policy[s][a] returns the probability that the agent takes action a while in state s under the policy.
  • V: This is a 1D numpy array with V.shape[0] equal to the number of states (env.nS). V[s] contains the estimated value of state s.
In [22]:
def value_iteration(env, gamma=1, theta=1e-8):
    """value iteration is the short cut to policy iteration: here the state function for 
    the next policy is just calculated in one step, that shortens many computations"""
    
    # starting with an intial state value fnction
    V = np.zeros(env.nS)
    
    while True:
        
        delta = 0
    
        # for all states get the new value function
        for s in range(env.nS):
         
            # the old value is remembered
            v_s = V[s]

            # the action value function is calculated for all actions
            q_s = q_from_v(env, V, s, gamma)

            # the next value of the state function is the maximum of all action values
            V[s] = max(q_s)

            # the difference is recorded
            delta = max(delta, abs(V[s] - v_s))
    
        # the iteration ends when the differences for all states are small enough
        if delta < theta:
            break
            
    # the policy is improved using the final state function 
    policy = policy_improvement(env, V, gamma)    
    
    # the policy and the state function are returned
    return policy, V

Use the next code cell to solve the MDP and visualize the output. The state-value function has been reshaped to match the shape of the gridworld.

In [23]:
policy_vi, V_vi = value_iteration(env)

# print the optimal policy
print("\nOptimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):")
print(policy_vi,"\n")

# plot the optimal state-value function
plot_values(V_vi)
Optimal Policy (LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3):
[[ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 0.  0.  0.  1.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  0.  1.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 1.  0.  0.  0.]
 [ 0.  0.  1.  0.]
 [ 0.  1.  0.  0.]
 [ 1.  0.  0.  0.]] 

Run the code cell below to test your function. If the code cell returns PASSED, then you have implemented the function correctly!

Note: In order to ensure accurate results, make sure that the value_iteration function satisfies the requirements outlined above (with three inputs, two outputs, and with the default values of the input arguments unchanged).

In [24]:
check_test.run_check('value_iteration_check', value_iteration)

PASSED