Skip to content

Instantly share code, notes, and snippets.

@denisyarats
Created June 15, 2020 21:25
Show Gist options
  • Save denisyarats/057c704a2816e48e8faebff073b864b0 to your computer and use it in GitHub Desktop.
Save denisyarats/057c704a2816e48e8faebff073b864b0 to your computer and use it in GitHub Desktop.
# coding=utf-8
# Copyright 2019 The SEED Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A class implementing minimal Atari 2600 preprocessing.
Adapted from Dopamine.
"""
from gym.spaces.box import Box
import numpy as np
import gym
from collections import deque
import cv2
class AtariPreprocessing(object):
"""A class implementing image preprocessing for Atari 2600 agents.
Specifically, this provides the following subset from the JAIR paper
(Bellemare et al., 2013) and Nature DQN paper (Mnih et al., 2015):
* Frame skipping (defaults to 4).
* Terminal signal when a life is lost (off by default).
* Grayscale and max-pooling of the last two frames.
* Downsample the screen to a square image (defaults to 84x84).
More generally, this class follows the preprocessing guidelines set down in
Machado et al. (2018), "Revisiting the Arcade Learning Environment:
Evaluation Protocols and Open Problems for General Agents".
It also provides random starting no-ops, which are used in the Rainbow, Apex
and R2D2 papers.
"""
def __init__(self,
environment,
frame_skip=4,
terminal_on_life_loss=False,
screen_size=84,
max_random_noops=0):
"""Constructor for an Atari 2600 preprocessor.
Args:
environment: Gym environment whose observations are preprocessed.
frame_skip: int, the frequency at which the agent experiences the game.
terminal_on_life_loss: bool, If True, the step() method returns
is_terminal=True whenever a life is lost. See Mnih et al. 2015.
screen_size: int, size of a resized Atari 2600 frame.
max_random_noops: int, maximum number of no-ops to apply at the beginning
of each episode to reduce determinism. These no-ops are applied at a
low-level, before frame skipping.
Raises:
ValueError: if frame_skip or screen_size are not strictly positive.
"""
if frame_skip <= 0:
raise ValueError(
'Frame skip should be strictly positive, got {}'.format(
frame_skip))
if screen_size <= 0:
raise ValueError(
'Target screen size should be strictly positive, got {}'.
format(screen_size))
self.environment = environment
self.terminal_on_life_loss = terminal_on_life_loss
self.frame_skip = frame_skip
self.screen_size = screen_size
self.max_random_noops = max_random_noops
obs_dims = self.environment.observation_space
# Stores temporary observations used for pooling over two successive
# frames.
self.screen_buffer = [
np.empty((obs_dims.shape[0], obs_dims.shape[1]), dtype=np.uint8),
np.empty((obs_dims.shape[0], obs_dims.shape[1]), dtype=np.uint8)
]
self.game_over = False
self.lives = 0 # Will need to be set by reset().
@property
def observation_space(self):
# Return the observation space adjusted to match the shape of the processed
# observations.
return Box(low=0,
high=255,
shape=(self.screen_size, self.screen_size, 1),
dtype=np.uint8)
@property
def action_space(self):
return self.environment.action_space
@property
def reward_range(self):
return self.environment.reward_range
@property
def metadata(self):
return self.environment.metadata
def close(self):
return self.environment.close()
def apply_random_noops(self):
"""Steps self.environment with random no-ops."""
if self.max_random_noops <= 0:
return
# Other no-ops implementations actually always do at least 1 no-op. We
# follow them.
no_ops = self.environment.np_random.randint(1,
self.max_random_noops + 1)
for _ in range(no_ops):
_, _, game_over, _ = self.environment.step(0)
if game_over:
self.environment.reset()
def reset(self):
"""Resets the environment.
Returns:
observation: numpy array, the initial observation emitted by the
environment.
"""
self.environment.reset()
self.apply_random_noops()
self.lives = self.environment.ale.lives()
self._fetch_grayscale_observation(self.screen_buffer[0])
self.screen_buffer[1].fill(0)
return self._pool_and_resize()
def render(self, mode):
"""Renders the current screen, before preprocessing.
This calls the Gym API's render() method.
Args:
mode: Mode argument for the environment's render() method.
Valid values (str) are:
'rgb_array': returns the raw ALE image.
'human': renders to display via the Gym renderer.
Returns:
if mode='rgb_array': numpy array, the most recent screen.
if mode='human': bool, whether the rendering was successful.
"""
return self.environment.render(mode)
def step(self, action):
"""Applies the given action in the environment.
Remarks:
* If a terminal state (from life loss or episode end) is reached, this may
execute fewer than self.frame_skip steps in the environment.
* Furthermore, in this case the returned observation may not contain valid
image data and should be ignored.
Args:
action: The action to be executed.
Returns:
observation: numpy array, the observation following the action.
reward: float, the reward following the action.
is_terminal: bool, whether the environment has reached a terminal state.
This is true when a life is lost and terminal_on_life_loss, or when the
episode is over.
info: Gym API's info data structure.
"""
accumulated_reward = 0.
for time_step in range(self.frame_skip):
# We bypass the Gym observation altogether and directly fetch the
# grayscale image from the ALE. This is a little faster.
_, reward, game_over, info = self.environment.step(action)
accumulated_reward += reward
info['game_over'] = game_over
if self.terminal_on_life_loss:
new_lives = self.environment.ale.lives()
is_terminal = game_over or new_lives < self.lives
self.lives = new_lives
else:
is_terminal = game_over
if is_terminal:
break
# We max-pool over the last two frames, in grayscale.
elif time_step >= self.frame_skip - 2:
t = time_step - (self.frame_skip - 2)
self._fetch_grayscale_observation(self.screen_buffer[t])
# Pool the last two observations.
observation = self._pool_and_resize()
self.game_over = game_over
return observation, accumulated_reward, is_terminal, info
def _fetch_grayscale_observation(self, output):
"""Returns the current observation in grayscale.
The returned observation is stored in 'output'.
Args:
output: numpy array, screen buffer to hold the returned observation.
Returns:
observation: numpy array, the current observation in grayscale.
"""
self.environment.ale.getScreenGrayscale(output)
return output
def _pool_and_resize(self):
"""Transforms two frames into a Nature DQN observation.
For efficiency, the transformation is done in-place in self.screen_buffer.
Returns:
transformed_screen: numpy array, pooled, resized screen.
"""
# Pool if there are enough screens to do so.
if self.frame_skip > 1:
np.maximum(self.screen_buffer[0],
self.screen_buffer[1],
out=self.screen_buffer[0])
transformed_image = cv2.resize(self.screen_buffer[0],
(self.screen_size, self.screen_size),
interpolation=cv2.INTER_LINEAR)
int_image = np.asarray(transformed_image, dtype=np.uint8)
return np.expand_dims(int_image, axis=2)
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super().__init__(env)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
def step(self, ac):
observation, reward, done, info = self.env.step(ac)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
done = True
info['TimeLimit.truncated'] = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
super().__init__(env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = gym.spaces.Box(
low=0,
high=255,
shape=(shp[:-1] + (shp[-1] * k,)),
dtype=env.observation_space.dtype)
def reset(self):
ob = self.env.reset()
for _ in range(self.k):
self.frames.append(ob)
return self._get_ob()
def step(self, action):
ob, reward, done, info = self.env.step(action)
self.frames.append(ob)
return self._get_ob(), reward, done, info
def _get_ob(self):
assert len(self.frames) == self.k
return np.concatenate(self.frames, axis=-1)
class ImageToPyTorch(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
old_shape = self.observation_space.shape
self.observation_space = gym.spaces.Box(
low=0,
high=255,
shape=(old_shape[-1], old_shape[0], old_shape[1]),
dtype=np.uint8,
)
def observation(self, observation):
return np.transpose(observation, axes=(2, 0, 1))
def make_env(env, seed, terminal_on_life_loss):
# Also try with sticky actions, instead of random noops
env_id = f'{env}NoFrameskip-v4'
env = gym.make(env_id)
env.seed(seed)
# terminal_on_life_loss: see section 4.1 in https://arxiv.org/pdf/1812.06110.pdf
# use env.env to remove TimeLimit
env = AtariPreprocessing(env.env,
frame_skip=4,
max_random_noops=0,
terminal_on_life_loss=terminal_on_life_loss)
env = TimeLimit(env, max_episode_steps=27000)
env = FrameStack(env, k=4)
env = ImageToPyTorch(env)
return env
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment