Last active
August 5, 2025 06:32
-
-
Save chiphuyen/a9107bb7deb87ecd7b07f718e20a1600 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" A clean implementation of DDPG algorithm | |
Continuous control with deep reinforcement learning (Lillicrap et al., 2015) | |
""" | |
import os | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
import gym | |
import numpy as np | |
import tensorflow as tf | |
class AttrDict(dict): | |
def __init__(self, **kwargs): | |
self.__dict__.update(**kwargs) | |
def __getattr__(self, key): | |
return self[key] | |
def __setattr__(self, key, value): | |
self[key] = value | |
class ReplayBuffer(): | |
""" | |
FIFO replay buffer | |
""" | |
def __init__(self, s_dim, a_dim, max_size): | |
self.s = np.zeros((max_size, s_dim), np.float32) | |
self.a = np.zeros((max_size, a_dim), np.float32) | |
self.next_s = np.zeros((max_size, s_dim), np.float32) | |
self.r = np.zeros(max_size, np.float32) | |
self.d = np.zeros(max_size, np.float32) | |
self.idx, self.size, self.max_size = 0, 0, max_size | |
def add(self, s, a, r, next_s, d): | |
self.s[self.idx] = s | |
self.a[self.idx] = a | |
self.next_s[self.idx] = next_s | |
self.r[self.idx] = r | |
self.d[self.idx] = d | |
self.idx = (self.idx + 1) % self.max_size | |
self.size = min(self.size + 1, self.max_size) | |
def __len__(self): | |
return self.size | |
def get_batch(self, batch_size=32): | |
indices = np.random.randint(0, self.size, batch_size) | |
return (self.s[indices], | |
self.a[indices], | |
self.next_s[indices], | |
self.r[indices], | |
self.d[indices]) | |
def mlp(X, hid_sizes, activation, output_activation): | |
for h in hid_sizes[:-1]: | |
X = tf.layers.dense(X, h, activation=activation) | |
return tf.layers.dense(X, hid_sizes[-1], activation=output_activation) | |
class ActorCritic(): | |
def __init__(self, config, S, A, a_dim, a_high, mode='agent'): | |
with tf.variable_scope(mode): | |
with tf.variable_scope('policy'): | |
self.policy = a_high * mlp(S, | |
config.hid_sizes + [a_dim], | |
config.activation, | |
config.output_activation) | |
# print(S, self.policy, tf.concat([S, self.policy], axis=-1)) | |
with tf.variable_scope('q'): | |
self.q_from_policy = mlp(tf.concat([S, self.policy], axis=-1), | |
config.hid_sizes + [1], | |
config.activation, | |
None) | |
if mode == 'agent': | |
with tf.variable_scope('q', reuse=True): | |
self.q = mlp(tf.concat([S, A], axis=-1), | |
config.hid_sizes + [1], | |
config.activation, | |
None) | |
def get_vars(scope): | |
return [var for var in tf.global_variables() if var.name.startswith(scope)] | |
class DDPG(): | |
def __init__(self, env, config): | |
self.env = env | |
self.s_dim = self.env.observation_space.shape[0] | |
self.a_dim = self.env.action_space.shape[0] | |
self.a_high = self.env.action_space.high[0] | |
self.a_noise = self.a_high / 10 | |
self.config = config | |
self.step = 0 | |
self.replay_buffer = ReplayBuffer( | |
self.s_dim, self.a_dim, self.config.replay_size) | |
self.sess = tf.Session() | |
self.build() | |
def _create_placeholder(self, name, dim=None): | |
return tf.placeholder(tf.float32, | |
shape=(None, dim) if dim else (None,), | |
name=name) | |
def create_placeholders(self): | |
self.inputs = {} | |
dims = {'s': self.s_dim, | |
'next_s': self.s_dim, | |
'a': self.a_dim, | |
'r': None, | |
'd': None} | |
for key in dims: | |
self.inputs[key] = self._create_placeholder(key, dims[key]) | |
def create_model(self): | |
self.agent = ActorCritic(self.config, | |
self.inputs['s'], | |
self.inputs['a'], | |
self.a_dim, | |
self.a_high, | |
'agent') | |
self.target = ActorCritic(self.config, | |
self.inputs['next_s'], | |
None, | |
self.a_dim, | |
self.a_high, | |
'target') | |
print('self.target.q_from_policy', self.target.q_from_policy) | |
estimated_q = self.inputs['r'] + self.config.discount * \ | |
(1 - self.inputs['d']) * self.target.q_from_policy | |
y = tf.stop_gradient(estimated_q) | |
self.policy_loss = -tf.reduce_mean(self.agent.q_from_policy) | |
self.q_loss = tf.reduce_mean((y - self.agent.q) ** 2) | |
def optimize(self): | |
print(get_vars('agent/q')) | |
self.p_algo = tf.train.AdamOptimizer( | |
learning_rate=self.config.policy_lr) | |
self.policy_opt = self.p_algo.minimize( | |
self.policy_loss, var_list=get_vars('agent/policy')) | |
self.q_algo = tf.train.AdamOptimizer(learning_rate=self.config.q_lr) | |
self.q_opt = self.q_algo.minimize(self.q_loss, | |
var_list=get_vars('agent/q')) | |
self.target_update = tf.group([tf.assign(target_v, self.config.tau * agent_v + (1 - self.config.tau) * target_v) # noqa | |
for target_v, agent_v in zip(get_vars('target'), get_vars('agent'))]) # noqa | |
def create_summary(self): | |
self.writer = tf.summary.FileWriter('outputs/graphs', | |
tf.get_default_graph()) | |
self.summary = tf.summary.merge((tf.summary.scalar('policy_loss', | |
self.policy_loss), | |
tf.summary.scalar('q_loss', | |
self.q_loss))) | |
def build(self): | |
self.create_placeholders() | |
self.create_model() | |
self.optimize() | |
self.create_summary() | |
def intialize_ddpg(self): | |
self.sess.run(tf.global_variables_initializer()) | |
init_target_with_agent = [tf.assign(target_v, agent_v) for | |
target_v, agent_v in zip(get_vars('target'), get_vars('agent'))] # noqa | |
self.sess.run(init_target_with_agent) | |
def get_action(self, s, noise_scale): | |
a = self.sess.run(self.agent.policy, | |
feed_dict={self.inputs['s']: s.reshape(1, -1)})[0] | |
return np.clip(a + noise_scale * np.random.randn(self.a_dim), | |
-self.a_high, | |
self.a_high) | |
def train(self): | |
self.intialize_ddpg() | |
for i in range(self.config.n_epochs): | |
print('EPOCH', i) | |
s, r, d, ep_r, ep_len = self.env.reset(), 0, False, 0, 0 | |
while (not d) and ep_len < self.config.max_ep_len: | |
a = self.get_action(s, self.a_noise) | |
next_s, r, d, _ = self.env.step(a) | |
ep_r += r | |
ep_len += 1 | |
d = False if ep_len == self.config.max_ep_len else d | |
self.replay_buffer.add(s, a, r, next_s, d) | |
if (ep_len % 100) == 0: | |
print(ep_len, a) | |
print(self.sess.run(self.target.q_from_policy, feed_dict={ | |
self.inputs['next_s']: s.reshape(1, -1)}).shape) | |
s = next_s | |
total_q_loss, total_p_loss = 0, 0 | |
for _ in range(min(self.replay_buffer.size, | |
self.config.steps_per_epoch)): | |
b_s, b_a, b_next_s, b_r, b_d = self.replay_buffer.get_batch( | |
self.config.batch_size) | |
feed_dict = {self.inputs['s']: b_s, | |
self.inputs['a']: b_a, | |
self.inputs['r']: b_r, | |
self.inputs['next_s']: b_next_s, | |
self.inputs['d']: b_d} | |
# noqa tmp = [op for op in tf.get_default_graph().get_operations() if op.type == "Placeholder" and op.name=='next_s'][0] | |
q_loss, _, summaries = self.sess.run([self.q_loss, | |
self.q_opt, | |
self.summary], | |
feed_dict=feed_dict) | |
policy_loss, _, _ = self.sess.run([self.policy_loss, | |
self.policy_opt, | |
self.target_update], | |
feed_dict=feed_dict) | |
total_q_loss += q_loss | |
total_p_loss += policy_loss | |
self.step += 1 | |
self.writer.add_summary(summaries, global_step=self.step) | |
print('policy_loss: %.2f. q_loss: %.2f' % ( | |
total_p_loss / self.config.steps_per_epoch, | |
total_q_loss / self.config.steps_per_epoch)) | |
def get_config(): | |
tau = 0.005 | |
discount = 0.99 | |
hid_sizes = [400, 300] | |
activation = tf.nn.relu | |
output_activation = tf.tanh | |
policy_lr = 1e-4 | |
q_lr = 1e-3 | |
batch_size = 32 | |
steps_per_epoch = 100 | |
max_ep_len = 500 | |
n_epochs = 10 | |
replay_size = 10000 | |
return AttrDict(locals()) | |
def main(): | |
config = get_config() | |
env = gym.make('MountainCarContinuous-v0') | |
algo = DDPG(env, config) | |
algo.train() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment