-
-
Save kkweon/c8d1caabaf7b43317bc8825c226045d2 to your computer and use it in GitHub Desktop.
| """ | |
| Simple policy gradient in Keras | |
| """ | |
| import gym | |
| import numpy as np | |
| from keras import layers | |
| from keras.models import Model | |
| from keras import backend as K | |
| from keras import utils as np_utils | |
| from keras import optimizers | |
| class Agent(object): | |
| def __init__(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
| """Gym Playing Agent | |
| Args: | |
| input_dim (int): the dimension of state. | |
| Same as `env.observation_space.shape[0]` | |
| output_dim (int): the number of discrete actions | |
| Same as `env.action_space.n` | |
| hidden_dims (list): hidden dimensions | |
| Methods: | |
| private: | |
| __build_train_fn -> None | |
| It creates a train function | |
| It's similar to defining `train_op` in Tensorflow | |
| __build_network -> None | |
| It create a base model | |
| Its output is each action probability | |
| public: | |
| get_action(state) -> action | |
| fit(state, action, reward) -> None | |
| """ | |
| self.input_dim = input_dim | |
| self.output_dim = output_dim | |
| self.__build_network(input_dim, output_dim, hidden_dims) | |
| self.__build_train_fn() | |
| def __build_network(self, input_dim, output_dim, hidden_dims=[32, 32]): | |
| """Create a base network""" | |
| self.X = layers.Input(shape=(input_dim,)) | |
| net = self.X | |
| for h_dim in hidden_dims: | |
| net = layers.Dense(h_dim)(net) | |
| net = layers.Activation("relu")(net) | |
| net = layers.Dense(output_dim)(net) | |
| net = layers.Activation("softmax")(net) | |
| self.model = Model(inputs=self.X, outputs=net) | |
| def __build_train_fn(self): | |
| """Create a train function | |
| It replaces `model.fit(X, y)` because we use the output of model and use it for training. | |
| For example, we need action placeholder | |
| called `action_one_hot` that stores, which action we took at state `s`. | |
| Hence, we can update the same action. | |
| This function will create | |
| `self.train_fn([state, action_one_hot, discount_reward])` | |
| which would train the model. | |
| """ | |
| action_prob_placeholder = self.model.output | |
| action_onehot_placeholder = K.placeholder(shape=(None, self.output_dim), | |
| name="action_onehot") | |
| discount_reward_placeholder = K.placeholder(shape=(None,), | |
| name="discount_reward") | |
| action_prob = K.sum(action_prob_placeholder * action_onehot_placeholder, axis=1) | |
| log_action_prob = K.log(action_prob) | |
| loss = - log_action_prob * discount_reward_placeholder | |
| loss = K.mean(loss) | |
| adam = optimizers.Adam() | |
| updates = adam.get_updates(params=self.model.trainable_weights, | |
| constraints=[], | |
| loss=loss) | |
| self.train_fn = K.function(inputs=[self.model.input, | |
| action_onehot_placeholder, | |
| discount_reward_placeholder], | |
| outputs=[], | |
| updates=updates) | |
| def get_action(self, state): | |
| """Returns an action at given `state` | |
| Args: | |
| state (1-D or 2-D Array): It can be either 1-D array of shape (state_dimension, ) | |
| or 2-D array shape of (n_samples, state_dimension) | |
| Returns: | |
| action: an integer action value ranging from 0 to (n_actions - 1) | |
| """ | |
| shape = state.shape | |
| if len(shape) == 1: | |
| assert shape == (self.input_dim,), "{} != {}".format(shape, self.input_dim) | |
| state = np.expand_dims(state, axis=0) | |
| elif len(shape) == 2: | |
| assert shape[1] == (self.input_dim), "{} != {}".format(shape, self.input_dim) | |
| else: | |
| raise TypeError("Wrong state shape is given: {}".format(state.shape)) | |
| action_prob = np.squeeze(self.model.predict(state)) | |
| assert len(action_prob) == self.output_dim, "{} != {}".format(len(action_prob), self.output_dim) | |
| return np.random.choice(np.arange(self.output_dim), p=action_prob) | |
| def fit(self, S, A, R): | |
| """Train a network | |
| Args: | |
| S (2-D Array): `state` array of shape (n_samples, state_dimension) | |
| A (1-D Array): `action` array of shape (n_samples,) | |
| It's simply a list of int that stores which actions the agent chose | |
| R (1-D Array): `reward` array of shape (n_samples,) | |
| A reward is given after each action. | |
| """ | |
| action_onehot = np_utils.to_categorical(A, num_classes=self.output_dim) | |
| discount_reward = compute_discounted_R(R) | |
| assert S.shape[1] == self.input_dim, "{} != {}".format(S.shape[1], self.input_dim) | |
| assert action_onehot.shape[0] == S.shape[0], "{} != {}".format(action_onehot.shape[0], S.shape[0]) | |
| assert action_onehot.shape[1] == self.output_dim, "{} != {}".format(action_onehot.shape[1], self.output_dim) | |
| assert len(discount_reward.shape) == 1, "{} != 1".format(len(discount_reward.shape)) | |
| self.train_fn([S, action_onehot, discount_reward]) | |
| def compute_discounted_R(R, discount_rate=.99): | |
| """Returns discounted rewards | |
| Args: | |
| R (1-D array): a list of `reward` at each time step | |
| discount_rate (float): Will discount the future value by this rate | |
| Returns: | |
| discounted_r (1-D array): same shape as input `R` | |
| but the values are discounted | |
| Examples: | |
| >>> R = [1, 1, 1] | |
| >>> compute_discounted_R(R, .99) # before normalization | |
| [1 + 0.99 + 0.99**2, 1 + 0.99, 1] | |
| """ | |
| discounted_r = np.zeros_like(R, dtype=np.float32) | |
| running_add = 0 | |
| for t in reversed(range(len(R))): | |
| running_add = running_add * discount_rate + R[t] | |
| discounted_r[t] = running_add | |
| discounted_r -= discounted_r.mean() / discounted_r.std() | |
| return discounted_r | |
| def run_episode(env, agent): | |
| """Returns an episode reward | |
| (1) Play until the game is done | |
| (2) The agent will choose an action according to the policy | |
| (3) When it's done, it will train from the game play | |
| Args: | |
| env (gym.env): Gym environment | |
| agent (Agent): Game Playing Agent | |
| Returns: | |
| total_reward (int): total reward earned during the whole episode | |
| """ | |
| done = False | |
| S = [] | |
| A = [] | |
| R = [] | |
| s = env.reset() | |
| total_reward = 0 | |
| while not done: | |
| a = agent.get_action(s) | |
| s2, r, done, info = env.step(a) | |
| total_reward += r | |
| S.append(s) | |
| A.append(a) | |
| R.append(r) | |
| s = s2 | |
| if done: | |
| S = np.array(S) | |
| A = np.array(A) | |
| R = np.array(R) | |
| agent.fit(S, A, R) | |
| return total_reward | |
| def main(): | |
| try: | |
| env = gym.make("CartPole-v0") | |
| input_dim = env.observation_space.shape[0] | |
| output_dim = env.action_space.n | |
| agent = Agent(input_dim, output_dim, [16, 16]) | |
| for episode in range(2000): | |
| reward = run_episode(env, agent) | |
| print(episode, reward) | |
| finally: | |
| env.close() | |
| if __name__ == '__main__': | |
| main() |
It's not the fastest policy gradient implementation but it works:
But it also makes me wonder how to make it less noisy.
Here is a graph from another implementation here: https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py
In your link "https://github.com/nyck33/reinforcement-learning/blob/master/2-cartpole/1-dqn/cartpole_dqn.py" it's not policy gradients used
- The noise can be inherent to the environment, if stochastic
- actor critic can eliminate noise because the gradient is calculated using a value network rather than (high-variance) returns of the environment
Copied your code, when tried to run received following error in get_updates():
TypeError: get_updates() got an unexpected keyword argument 'constraints' why this might be ?
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out theconstraints=[],at line 93. - Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution(). - Then it throws
IndexError: list index out of rangebecause ofoutputs=[],(line 99). Just change it tooutputs=[self.model.output],and it runs fine for me.
I'm probably using a different version of keras/tf, but I had to fix a couple of things to make the code run.
- The code as it is here throws a
TypeError: get_updates() got an unexpected keyword argument 'constraints'. I simply commented out theconstraints=[],at line 93.- Then I got
ValueError: Cannot create a Keras backend function with updates but no outputs during eager execution.To disable eager execution I imported tensorflow and calledtf.compat.v1.disable_eager_execution().- Then it throws
IndexError: list index out of rangebecause ofoutputs=[],(line 99). Just change it tooutputs=[self.model.output],and it runs fine for me.
thank you :D


I think the implementation is correct:
Not sure though