numpy-tutorials

Deep reinforcement learning with Pong from pixels


This article is not currently tested due to licensing/installation issues with
the underlying `gym` and `atari-py` dependencies.
Help improve this article by developing an example with reduced dependency
footprint!

This tutorial demonstrates how to implement a deep reinforcement learning (RL) agent from scratch using a policy gradient method that learns to play the Pong video game using screen pixels as inputs with NumPy. Your Pong agent will obtain experience on the go using an artificial neural network as its policy.

Pong is a 2D game from 1972 where two players use “rackets” to play a form of table tennis. Each player moves the racket up and down the screen and tries to hit a ball in their opponent’s direction by touching it. The goal is to hit the ball such that it goes past the opponent’s racket (they miss their shot). According to the rules, if a player reaches 21 points, they win. In Pong, the RL agent that learns to play against an opponent is displayed on the right.

Diagram showing operations detailed in this tutorial

This example is based on the code developed by Andrej Karpathy for the Deep RL Bootcamp in 2017 at UC Berkeley. His blog post from 2016 also provides more background on the mechanics and theory used in Pong RL.

Prerequisites

This tutorial can also be run locally in an isolated environment, such as Virtualenv and conda.

Table of contents

A note on RL and deep RL

In RL, your agent learns from trial and error by interacting with an environment using a so-called policy to gain experience. After taking one action, the agent receives information about its reward (which it may or may not get) and the next observation of the environment. It can then proceed to take another action. This happens over a number of episodes and/or until the task is deemed to be complete.

The agent’s policy works by “mapping” the agent’s observations to its actions — that is, assigning a presentation of what the agent observes with required actions. The overall goal is usually to optimize the agent’s policy such that it maximizes the expected rewards from each observation.

For detailed information about RL, there is an introductory book by Richard Sutton and Andrew Barton.

Check out the Appendix at the end of the tutorial for more information.

Deep RL glossary

Below is a concise glossary of deep RL terms you may find useful for the remaining part of the tutorial:

You will train your Pong agent through an “on-policy” method using policy gradients — it’s an algorithm belonging to a family of policy-based methods. Policy gradient methods typically update the parameters of the policy with respect to the long-term cumulative reward using gradient descent that is widely used in machine learning. And, since the goal is to maximize the function (the rewards), not minimize it, the process is also called gradient ascent. In other words, you use a policy for the agent to take actions and the objective is to maximize the rewards, which you do by computing the gradients and use them to update the parameters in the policy (neural) network.

Set up Pong

1. First, you should install OpenAI Gym (using pip install gym[atari] - this package is currently not available on conda), and import NumPy, Gym and the necessary modules:

import numpy as np
import gym

Gym can monitor and save the output using the Monitor wrapper:

from gym import wrappers
from gym.wrappers import Monitor

2. Instantiate a Gym environment for the game of Pong:

env = gym.make("Pong-v0")

3. Let’s review which actions are available in the Pong-v0 environment:

print(env.action_space)
print(env.get_action_meanings())

There are 6 actions. However, LEFTFIRE is actually LEFT, RIGHTFIRERIGHT, and NOOPFIRE.

For simplicity, your policy network will have one output — a (log) probability for “moving up” (indexed at 2 or RIGHT). The other available action will be indexed at 3 (“move down” or LEFT).

4. Gym can save videos of the agent’s learning in an MP4 format — wrap Monitor() around the environment by running the following:

env = Monitor(env, "./video", force=True)

While you can perform all kinds of RL experiments in a Jupyter notebook, rendering images or videos of a Gym environment to visualize how your agent plays the game of Pong after training can be rather challenging. If you want to set up video playback in a notebook, you can find the details in the Appendix at the end of this tutorial.

Preprocess frames (the observation)

In this section you will set up a function to preprocess the input data (game observation) to make it digestible for the neural network, which can only work with inputs that are in a form of tensors (multidimensional arrays) of floating-point type.

Your agent will use the frames from the Pong game — pixels from screen frames — as input-observations for the policy network. The game observation tells the agent about where the ball is before it is fed (with a forward pass) into the neural network (the policy). This is similar to DeepMind’s DQN method (which is further discussed in the Appendix).

Pong screen frames are 210x160 pixels over 3 color dimensions (red, green and blue). The arrays are encoded with uint8 (or 8-bit integers), and these observations are stored on a Gym Box instance.

1. Check the Pong’s observations:

print(env.observation_space)

In Gym, the agent’s actions and observations can be part of the Box (n-dimensional) or Discrete (fixed-range integers) classes.

2. You can view a random observation — one frame — by:

1) Setting the random `seed` before initialization (optional).

2) Calling  Gym's `reset()` to reset the environment, which returns an initial observation.

3) Using Matplotlib to display the `render`ed observation.

(You can refer to the OpenAI Gym core API for more information about Gym’s core classes and methods.)

import matplotlib.pyplot as plt

env.seed(42)
env.reset()
random_frame = env.render(mode="rgb_array")
print(random_frame.shape)
plt.imshow(random_frame)

To feed the observations into the policy (neural) network, you need to convert them into 1D grayscale vectors with 6,400 (80x80x1) floating point arrays. (During training, you will use NumPy’s np.ravel() function to flatten these arrays.)

3. Set up a helper function for frame (observation) preprocessing:

def frame_preprocessing(observation_frame):
    # Crop the frame.
    observation_frame = observation_frame[35:195]
    # Downsample the frame by a factor of 2.
    observation_frame = observation_frame[::2, ::2, 0]
    # Remove the background and apply other enhancements.
    observation_frame[observation_frame == 144] = 0  # Erase the background (type 1).
    observation_frame[observation_frame == 109] = 0  # Erase the background (type 2).
    observation_frame[observation_frame != 0] = 1  # Set the items (rackets, ball) to 1.
    # Return the preprocessed frame as a 1D floating-point array.
    return observation_frame.astype(float)

4. Preprocess the random frame from earlier to test the function — the input for the policy network is an 80x80 1D image:

preprocessed_random_frame = frame_preprocessing(random_frame)
plt.imshow(preprocessed_random_frame, cmap="gray")
print(preprocessed_random_frame.shape)

Create the policy (the neural network) and the forward pass

Next, you will define the policy as a simple feedforward network that uses a game observation as an input and outputs an action log probability:

1. Let’s instantiate certain parameters for the input, hidden, and output layers, and start setting up the network model.

Start by creating a random number generator instance for the experiment (seeded for reproducibility):

rng = np.random.default_rng(seed=12288743)

Then:

D = 80 * 80
H = 200
model = {}

In a neural network, weights are important adjustable parameters that the network fine-tunes by forward and backward propagating the data.

2. Using a technique called Xavier initialization, set up the network model’s initial weights with NumPy’s Generator.standard_normal() that returns random numbers over a standard Normal distribution, as well as np.sqrt():

model["W1"] = rng.standard_normal(size=(H, D)) / np.sqrt(D)
model["W2"] = rng.standard_normal(size=H) / np.sqrt(H)

3. Your policy network starts by randomly initializing the weights and feeds the input data (frames) forward from the input layer through a hidden layer to the output layers. This process is called the forward pass or forward propagation, and is outlined in the function policy_forward():

def policy_forward(x, model):
    # Matrix-multiply the weights by the input in the one and only hidden layer.
    h = np.dot(model["W1"], x)
    # Apply non-linearity with ReLU.
    h[h < 0] = 0
    # Calculate the "dot" product in the outer layer.
    # The input for the sigmoid function is called logit.
    logit = np.dot(model["W2"], h)
    # Apply the sigmoid function (non-linear activation).
    p = sigmoid(logit)
    # Return a log probability for the action 2 ("move up")
    # and the hidden "state" that you need for backpropagation.
    return p, h

Note that there are two activation functions for determining non-linear relationships between inputs and outputs. These non-linear functions are applied to the output of the layers:

4. Define the sigmoid function separately with NumPy’s np.exp() for computing exponentials:

def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

Set up the update step (backpropagation)

During learning in your deep RL algorithm, you use the action log probabilities (given an observation) and the discounted returns (for example, +1 or -1 in Pong) and perform the backward pass or backpropagation to update the parameters — the policy network’s weights.

1. Let’s define the backward pass function (policy_backward()) with the help of NumPy’s modules for array multiplication — np.dot() (matrix multiplication), np.outer() (outer product computation), and np.ravel() (to flatten arrays into 1D arrays):

def policy_backward(eph, epdlogp, model):
    dW2 = np.dot(eph.T, epdlogp).ravel()
    dh = np.outer(epdlogp, model["W2"])
    dh[eph <= 0] = 0
    dW1 = np.dot(dh.T, epx)
    # Return new "optimized" weights for the policy network.
    return {"W1": dW1, "W2": dW2}

Using the intermediate hidden “states” of the network (eph) and the gradients of action log probabilities (epdlogp) for an episode, the policy_backward function propagates the gradients back through the policy network and update the weights.

2. When applying backpropagation during agent training, you will need to save several variables for each episode. Let’s instantiate empty lists to store them:

# All preprocessed observations for the episode.
xs = []
# All hidden "states" (from the network) for the episode.
hs = []
# All gradients of probability actions
# (with respect to observations) for the episode.
dlogps = []
# All rewards for the episode.
drs = []

You will reset these variables manually at the end of each episode during training after they are “full” and reshape with NumPy’s np.vstack(). This is demonstrated in the training stage towards the end of the tutorial.

3. Next, to perform a gradient ascent when optimizing the agent’s policy, it is common to use deep learning optimizers (you’re performing optimization with gradients). In this example, you’ll use RMSProp — an adaptive optimization method. Let’s set a discounting factor — a decay rate — for the optimizer:

decay_rate = 0.99

4. You will also need to store the gradients (with the help of NumPy’s np.zeros_like()) for the optimization step during training:

grad_buffer = {k: np.zeros_like(v) for k, v in model.items()}
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.items()}

Define the discounted rewards (expected return) function

In this section, you will set up a function for computing discounted rewards (discount_rewards()) — the expected return from an observation — that uses a 1D array of rewards as inputs (with the help of NumPy’s np.zeros_like()) function.

To provide more weight to shorter-term rewards over longer-term ones, you will use a discount factor (gamma) that is often a floating-point number between 0.9 and 0.99.

gamma = 0.99


def discount_rewards(r, gamma):
    discounted_r = np.zeros_like(r)
    running_add = 0
    # From the last reward to the first...
    for t in reversed(range(0, r.size)):
        # ...reset the reward sum
        if r[t] != 0:
            running_add = 0
        # ...compute the discounted reward
        running_add = running_add * gamma + r[t]
        discounted_r[t] = running_add
    return discounted_r

Train the agent for a number of episodes

This section covers how to set up the training process during which your agent will be learning to play Pong using its policy.

The pseudocode for the policy gradient method for Pong:

Diagram showing operations detailed in this tutorial

You can stop the training at any time or/and check saved MP4 videos of saved plays on your disk in the /video directory. You can set the maximum number of episodes that is more appropriate for your setup.

1. For demo purposes, let’s limit the number of episodes for training to 3. If you are using hardware acceleration (CPUs and GPUs), you can increase the number to 1,000 or beyond. For comparison, Andrej Karpathy’s original experiment took about 8,000 episodes.

max_episodes = 3

2. Set the batch size and the learning rate values:

batch_size = 3
learning_rate = 1e-4

3. Set the game rendering default variable for Gym’s render method (it is used to display the observation and is optional but can be useful during debugging):

render = False

4. Set the agent’s initial (random) observation by calling reset():

observation = env.reset()

5. Initialize the previous observation:

prev_x = None

6. Initialize the reward variables and the episode count:

running_reward = None
reward_sum = 0
episode_number = 0

7. To simulate motion between the frames, set the single input frame (x) for the policy network as the difference between the current and previous preprocessed frames:

def update_input(prev_x, cur_x, D):
    if prev_x is not None:
        x = cur_x - prev_x
    else:
        x = np.zeros(D)
    return x

8. Finally, start the training loop, using the functions you have predefined:

:tags: [output_scroll]

while episode_number < max_episodes:
    # (For rendering.)
    if render:
        env.render()

    # 1. Preprocess the observation (a game frame) and flatten with NumPy's `ravel()`.
    cur_x = frame_preprocessing(observation).ravel()

    # 2. Instantiate the observation for the policy network
    x = update_input(prev_x, cur_x, D)
    prev_x = cur_x

    # 3. Perform the forward pass through the policy network using the observations
    # (preprocessed frames as inputs) and store the action log probabilities
    # and hidden "states" (for backpropagation) during the course of each episode.
    aprob, h = policy_forward(x, model)
    # 4. Let the action indexed at `2` ("move up") be that probability
    # if it's higher than a randomly sampled value
    # or use action `3` ("move down") otherwise.
    action = 2 if rng.uniform() < aprob else 3

    # 5. Cache the observations and hidden "states" (from the network)
    # in separate variables for backpropagation.
    xs.append(x)
    hs.append(h)

    # 6. Compute the gradients of action log probabilities:
    # - If the action was to "move up" (index `2`):
    y = 1 if action == 2 else 0
    # - The cross-entropy:
    # `y*log(aprob) + (1 - y)*log(1-aprob)`
    # or `log(aprob)` if y = 1, else: `log(1 - aprob)`.
    # (Recall: you used the sigmoid function (`1/(1+np.exp(-x)`) to output
    # `aprob` action probabilities.)
    # - Then the gradient: `y - aprob`.
    # 7. Append the gradients of your action log probabilities.
    dlogps.append(y - aprob)
    # 8. Take an action and update the parameters with Gym's `step()`
    # function; obtain a new observation.
    observation, reward, done, info = env.step(action)
    # 9. Update the total sum of rewards.
    reward_sum += reward
    # 10. Append the reward for the previous action.
    drs.append(reward)

    # After an episode is finished:
    if done:
        episode_number += 1
        # 11. Collect and reshape stored values with `np.vstack()` of:
        # - Observation frames (inputs),
        epx = np.vstack(xs)
        # - hidden "states" (from the network),
        eph = np.vstack(hs)
        # - gradients of action log probabilities,
        epdlogp = np.vstack(dlogps)
        # - and received rewards for the past episode.
        epr = np.vstack(drs)

        # 12. Reset the stored variables for the new episode:
        xs = []
        hs = []
        dlogps = []
        drs = []

        # 13. Discount the rewards for the past episode using the helper
        # function you defined earlier...
        discounted_epr = discount_rewards(epr, gamma)
        # ...and normalize them because they have high variance
        # (this is explained below.)
        discounted_epr -= np.mean(discounted_epr)
        discounted_epr /= np.std(discounted_epr)

        # 14. Multiply the discounted rewards by the gradients of the action
        # log probabilities (the "advantage").
        epdlogp *= discounted_epr
        # 15. Use the gradients to perform backpropagation and gradient ascent.
        grad = policy_backward(eph, epdlogp, model)
        # 16. Save the policy gradients in a buffer.
        for k in model:
            grad_buffer[k] += grad[k]
        # 17. Use the RMSProp optimizer to perform the policy network
        # parameter (weight) update at every batch size
        # (by default: every 10 episodes).
        if episode_number % batch_size == 0:
            for k, v in model.items():
                # The gradient.
                g = grad_buffer[k]
                # Use the RMSProp discounting factor.
                rmsprop_cache[k] = (
                    decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g ** 2
                )
                # Update the policy network with a learning rate
                # and the RMSProp optimizer using gradient ascent
                # (hence, there's no negative sign)
                model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
                # Reset the gradient buffer at the end.
                grad_buffer[k] = np.zeros_like(v)

        # 18. Measure the total discounted reward.
        running_reward = (
            reward_sum
            if running_reward is None
            else running_reward * 0.99 + reward_sum * 0.01
        )
        print(
            "Resetting the Pong environment. Episode total reward: {} Running mean: {}".format(
                reward_sum, running_reward
            )
        )

        # 19. Set the agent's initial observation by calling Gym's `reset()` function
        # for the next episode and setting the reward sum back to 0.
        reward_sum = 0
        observation = env.reset()
        prev_x = None

    # 20. Display the output during training.
    if reward != 0:
        print(
            "Episode {}: Game finished. Reward: {}...".format(episode_number, reward)
            + ("" if reward == -1 else " POSITIVE REWARD!")
        )

A few notes:

# env.close()

Next steps

You may notice that training an RL agent takes a long time if you increase the number of episodes from 100 to 500 or 1,000+, depending on the hardware — CPUs and GPUs — you are using for this task.

Policy gradient methods can learn a task if you give them a lot of time, and optimization in RL is a challenging problem. Training agents to learn to play Pong or any other task can be sample-inefficient and require a lot of episodes. You may also notice in your training output that even after hundreds of episodes, the rewards may have high variance.

In addition, like in many deep learning-based algorithms, you should take into account a large amount of parameters that your policy has to learn. In Pong, this number adds up to 1 million or more with 200 nodes in the hidden layer of the network and the input dimension being of size 6,400 (80x80). Therefore, adding more CPUs and GPUs to assist with training can always be an option.

You can use a much more advanced policy gradient-based algorithm that can help speed up training, improve the sensitivity to parameters, and resolve other issues. For example, there are “self-play” methods, such as Proximal Policy Optimization (PPO) developed by John Schulman et al in 2017, which were used to train the OpenAI Five agent over 10 months to play Dota 2 at a competitive level. Of course, if you apply these methods to smaller Gym environments, it should take hours, not months to train.

In general, there are many RL challenges and possible solutions and you can explore some of them in Reinforcement learning, fast and slow by Matthew Botvinick, Sam Ritter, Jane X. Wang, Zeb Kurth-Nelson, Charles Blundell, and Demis Hassabis (2019).


If you want to learn more about deep RL, you should check out the following free educational material:

Building a neural network from scratch with NumPy is a great way to learn more about NumPy and about deep learning. However, for real-world applications you should use specialized frameworks — such as PyTorch, JAX, TensorFlow or MXNet — that provide NumPy-like APIs, have built-in automatic differentiation and GPU support, and are designed for high-performance numerical computing and machine learning.

Appendix

Notes on RL and deep RL

How to set up video playback in your Jupyter notebook

  1. If you’re using Google Colaboratory, run the following commands in the notebook cells to help with video playback:

     # Install Xvfb and X11 dependencies.
     !apt-get install -y xvfb x11-utils > /dev/null 2>&1
     # To work with videos, install FFmpeg.
     !apt-get install -y ffmpeg > /dev/null 2>&1
     # Install PyVirtualDisplay for visual feedback and other libraries/dependencies.
     !pip install pyvirtualdisplay PyOpenGL PyOpenGL-accelerate > /dev/null 2>&1
    
  2. Then, add this Python code:

     # Import the virtual display module.
     from pyvirtualdisplay import Display
     # Import ipythondisplay and HTML from IPython for image and video rendering.
     from IPython import display as ipythondisplay
     from IPython.display import HTML
    
     # Initialize the virtual buffer at 400x300 (adjustable size).
     # With Xvfb, you should set `visible=False`.
     display = Display(visible=False, size=(400, 300))
     display.start()
    
     # Check that no display is present.
     # If no displays are present, the expected output is `:0`.
     !echo $DISPLAY
    
     # Define a helper function to display videos in Jupyter notebooks:.
     # (Source: https://star-ai.github.io/Rendering-OpenAi-Gym-in-Colaboratory/)
    
     import sys
     import math
     import glob
     import io
     import base64
    
     def show_any_video(mp4video=0):
         mp4list = glob.glob('video/*.mp4')
         if len(mp4list) > 0:
             mp4 = mp4list[mp4video]
             video = io.open(mp4, 'r+b').read()
             encoded = base64.b64encode(video)
             ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                                                 loop controls style="height: 400px;">
                                                 <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                                                 </video>'''.format(encoded.decode('ascii'))))
    
         else:
             print('Could not find the video!')