Created
June 15, 2020 21:25
-
-
Save denisyarats/057c704a2816e48e8faebff073b864b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
# Copyright 2019 The SEED Authors | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""A class implementing minimal Atari 2600 preprocessing. | |
Adapted from Dopamine. | |
""" | |
from gym.spaces.box import Box | |
import numpy as np | |
import gym | |
from collections import deque | |
import cv2 | |
class AtariPreprocessing(object): | |
"""A class implementing image preprocessing for Atari 2600 agents. | |
Specifically, this provides the following subset from the JAIR paper | |
(Bellemare et al., 2013) and Nature DQN paper (Mnih et al., 2015): | |
* Frame skipping (defaults to 4). | |
* Terminal signal when a life is lost (off by default). | |
* Grayscale and max-pooling of the last two frames. | |
* Downsample the screen to a square image (defaults to 84x84). | |
More generally, this class follows the preprocessing guidelines set down in | |
Machado et al. (2018), "Revisiting the Arcade Learning Environment: | |
Evaluation Protocols and Open Problems for General Agents". | |
It also provides random starting no-ops, which are used in the Rainbow, Apex | |
and R2D2 papers. | |
""" | |
def __init__(self, | |
environment, | |
frame_skip=4, | |
terminal_on_life_loss=False, | |
screen_size=84, | |
max_random_noops=0): | |
"""Constructor for an Atari 2600 preprocessor. | |
Args: | |
environment: Gym environment whose observations are preprocessed. | |
frame_skip: int, the frequency at which the agent experiences the game. | |
terminal_on_life_loss: bool, If True, the step() method returns | |
is_terminal=True whenever a life is lost. See Mnih et al. 2015. | |
screen_size: int, size of a resized Atari 2600 frame. | |
max_random_noops: int, maximum number of no-ops to apply at the beginning | |
of each episode to reduce determinism. These no-ops are applied at a | |
low-level, before frame skipping. | |
Raises: | |
ValueError: if frame_skip or screen_size are not strictly positive. | |
""" | |
if frame_skip <= 0: | |
raise ValueError( | |
'Frame skip should be strictly positive, got {}'.format( | |
frame_skip)) | |
if screen_size <= 0: | |
raise ValueError( | |
'Target screen size should be strictly positive, got {}'. | |
format(screen_size)) | |
self.environment = environment | |
self.terminal_on_life_loss = terminal_on_life_loss | |
self.frame_skip = frame_skip | |
self.screen_size = screen_size | |
self.max_random_noops = max_random_noops | |
obs_dims = self.environment.observation_space | |
# Stores temporary observations used for pooling over two successive | |
# frames. | |
self.screen_buffer = [ | |
np.empty((obs_dims.shape[0], obs_dims.shape[1]), dtype=np.uint8), | |
np.empty((obs_dims.shape[0], obs_dims.shape[1]), dtype=np.uint8) | |
] | |
self.game_over = False | |
self.lives = 0 # Will need to be set by reset(). | |
@property | |
def observation_space(self): | |
# Return the observation space adjusted to match the shape of the processed | |
# observations. | |
return Box(low=0, | |
high=255, | |
shape=(self.screen_size, self.screen_size, 1), | |
dtype=np.uint8) | |
@property | |
def action_space(self): | |
return self.environment.action_space | |
@property | |
def reward_range(self): | |
return self.environment.reward_range | |
@property | |
def metadata(self): | |
return self.environment.metadata | |
def close(self): | |
return self.environment.close() | |
def apply_random_noops(self): | |
"""Steps self.environment with random no-ops.""" | |
if self.max_random_noops <= 0: | |
return | |
# Other no-ops implementations actually always do at least 1 no-op. We | |
# follow them. | |
no_ops = self.environment.np_random.randint(1, | |
self.max_random_noops + 1) | |
for _ in range(no_ops): | |
_, _, game_over, _ = self.environment.step(0) | |
if game_over: | |
self.environment.reset() | |
def reset(self): | |
"""Resets the environment. | |
Returns: | |
observation: numpy array, the initial observation emitted by the | |
environment. | |
""" | |
self.environment.reset() | |
self.apply_random_noops() | |
self.lives = self.environment.ale.lives() | |
self._fetch_grayscale_observation(self.screen_buffer[0]) | |
self.screen_buffer[1].fill(0) | |
return self._pool_and_resize() | |
def render(self, mode): | |
"""Renders the current screen, before preprocessing. | |
This calls the Gym API's render() method. | |
Args: | |
mode: Mode argument for the environment's render() method. | |
Valid values (str) are: | |
'rgb_array': returns the raw ALE image. | |
'human': renders to display via the Gym renderer. | |
Returns: | |
if mode='rgb_array': numpy array, the most recent screen. | |
if mode='human': bool, whether the rendering was successful. | |
""" | |
return self.environment.render(mode) | |
def step(self, action): | |
"""Applies the given action in the environment. | |
Remarks: | |
* If a terminal state (from life loss or episode end) is reached, this may | |
execute fewer than self.frame_skip steps in the environment. | |
* Furthermore, in this case the returned observation may not contain valid | |
image data and should be ignored. | |
Args: | |
action: The action to be executed. | |
Returns: | |
observation: numpy array, the observation following the action. | |
reward: float, the reward following the action. | |
is_terminal: bool, whether the environment has reached a terminal state. | |
This is true when a life is lost and terminal_on_life_loss, or when the | |
episode is over. | |
info: Gym API's info data structure. | |
""" | |
accumulated_reward = 0. | |
for time_step in range(self.frame_skip): | |
# We bypass the Gym observation altogether and directly fetch the | |
# grayscale image from the ALE. This is a little faster. | |
_, reward, game_over, info = self.environment.step(action) | |
accumulated_reward += reward | |
info['game_over'] = game_over | |
if self.terminal_on_life_loss: | |
new_lives = self.environment.ale.lives() | |
is_terminal = game_over or new_lives < self.lives | |
self.lives = new_lives | |
else: | |
is_terminal = game_over | |
if is_terminal: | |
break | |
# We max-pool over the last two frames, in grayscale. | |
elif time_step >= self.frame_skip - 2: | |
t = time_step - (self.frame_skip - 2) | |
self._fetch_grayscale_observation(self.screen_buffer[t]) | |
# Pool the last two observations. | |
observation = self._pool_and_resize() | |
self.game_over = game_over | |
return observation, accumulated_reward, is_terminal, info | |
def _fetch_grayscale_observation(self, output): | |
"""Returns the current observation in grayscale. | |
The returned observation is stored in 'output'. | |
Args: | |
output: numpy array, screen buffer to hold the returned observation. | |
Returns: | |
observation: numpy array, the current observation in grayscale. | |
""" | |
self.environment.ale.getScreenGrayscale(output) | |
return output | |
def _pool_and_resize(self): | |
"""Transforms two frames into a Nature DQN observation. | |
For efficiency, the transformation is done in-place in self.screen_buffer. | |
Returns: | |
transformed_screen: numpy array, pooled, resized screen. | |
""" | |
# Pool if there are enough screens to do so. | |
if self.frame_skip > 1: | |
np.maximum(self.screen_buffer[0], | |
self.screen_buffer[1], | |
out=self.screen_buffer[0]) | |
transformed_image = cv2.resize(self.screen_buffer[0], | |
(self.screen_size, self.screen_size), | |
interpolation=cv2.INTER_LINEAR) | |
int_image = np.asarray(transformed_image, dtype=np.uint8) | |
return np.expand_dims(int_image, axis=2) | |
class TimeLimit(gym.Wrapper): | |
def __init__(self, env, max_episode_steps=None): | |
super().__init__(env) | |
self._max_episode_steps = max_episode_steps | |
self._elapsed_steps = 0 | |
def step(self, ac): | |
observation, reward, done, info = self.env.step(ac) | |
self._elapsed_steps += 1 | |
if self._elapsed_steps >= self._max_episode_steps: | |
done = True | |
info['TimeLimit.truncated'] = True | |
return observation, reward, done, info | |
def reset(self, **kwargs): | |
self._elapsed_steps = 0 | |
return self.env.reset(**kwargs) | |
class FrameStack(gym.Wrapper): | |
def __init__(self, env, k): | |
super().__init__(env) | |
self.k = k | |
self.frames = deque([], maxlen=k) | |
shp = env.observation_space.shape | |
self.observation_space = gym.spaces.Box( | |
low=0, | |
high=255, | |
shape=(shp[:-1] + (shp[-1] * k,)), | |
dtype=env.observation_space.dtype) | |
def reset(self): | |
ob = self.env.reset() | |
for _ in range(self.k): | |
self.frames.append(ob) | |
return self._get_ob() | |
def step(self, action): | |
ob, reward, done, info = self.env.step(action) | |
self.frames.append(ob) | |
return self._get_ob(), reward, done, info | |
def _get_ob(self): | |
assert len(self.frames) == self.k | |
return np.concatenate(self.frames, axis=-1) | |
class ImageToPyTorch(gym.ObservationWrapper): | |
def __init__(self, env): | |
super().__init__(env) | |
old_shape = self.observation_space.shape | |
self.observation_space = gym.spaces.Box( | |
low=0, | |
high=255, | |
shape=(old_shape[-1], old_shape[0], old_shape[1]), | |
dtype=np.uint8, | |
) | |
def observation(self, observation): | |
return np.transpose(observation, axes=(2, 0, 1)) | |
def make_env(env, seed, terminal_on_life_loss): | |
# Also try with sticky actions, instead of random noops | |
env_id = f'{env}NoFrameskip-v4' | |
env = gym.make(env_id) | |
env.seed(seed) | |
# terminal_on_life_loss: see section 4.1 in https://arxiv.org/pdf/1812.06110.pdf | |
# use env.env to remove TimeLimit | |
env = AtariPreprocessing(env.env, | |
frame_skip=4, | |
max_random_noops=0, | |
terminal_on_life_loss=terminal_on_life_loss) | |
env = TimeLimit(env, max_episode_steps=27000) | |
env = FrameStack(env, k=4) | |
env = ImageToPyTorch(env) | |
return env |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment