Skip to content

Instantly share code, notes, and snippets.

@chiphuyen
Last active August 5, 2025 06:32
Show Gist options
  • Save chiphuyen/a9107bb7deb87ecd7b07f718e20a1600 to your computer and use it in GitHub Desktop.
Save chiphuyen/a9107bb7deb87ecd7b07f718e20a1600 to your computer and use it in GitHub Desktop.
""" A clean implementation of DDPG algorithm
Continuous control with deep reinforcement learning (Lillicrap et al., 2015)
"""
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import gym
import numpy as np
import tensorflow as tf
class AttrDict(dict):
def __init__(self, **kwargs):
self.__dict__.update(**kwargs)
def __getattr__(self, key):
return self[key]
def __setattr__(self, key, value):
self[key] = value
class ReplayBuffer():
"""
FIFO replay buffer
"""
def __init__(self, s_dim, a_dim, max_size):
self.s = np.zeros((max_size, s_dim), np.float32)
self.a = np.zeros((max_size, a_dim), np.float32)
self.next_s = np.zeros((max_size, s_dim), np.float32)
self.r = np.zeros(max_size, np.float32)
self.d = np.zeros(max_size, np.float32)
self.idx, self.size, self.max_size = 0, 0, max_size
def add(self, s, a, r, next_s, d):
self.s[self.idx] = s
self.a[self.idx] = a
self.next_s[self.idx] = next_s
self.r[self.idx] = r
self.d[self.idx] = d
self.idx = (self.idx + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def __len__(self):
return self.size
def get_batch(self, batch_size=32):
indices = np.random.randint(0, self.size, batch_size)
return (self.s[indices],
self.a[indices],
self.next_s[indices],
self.r[indices],
self.d[indices])
def mlp(X, hid_sizes, activation, output_activation):
for h in hid_sizes[:-1]:
X = tf.layers.dense(X, h, activation=activation)
return tf.layers.dense(X, hid_sizes[-1], activation=output_activation)
class ActorCritic():
def __init__(self, config, S, A, a_dim, a_high, mode='agent'):
with tf.variable_scope(mode):
with tf.variable_scope('policy'):
self.policy = a_high * mlp(S,
config.hid_sizes + [a_dim],
config.activation,
config.output_activation)
# print(S, self.policy, tf.concat([S, self.policy], axis=-1))
with tf.variable_scope('q'):
self.q_from_policy = mlp(tf.concat([S, self.policy], axis=-1),
config.hid_sizes + [1],
config.activation,
None)
if mode == 'agent':
with tf.variable_scope('q', reuse=True):
self.q = mlp(tf.concat([S, A], axis=-1),
config.hid_sizes + [1],
config.activation,
None)
def get_vars(scope):
return [var for var in tf.global_variables() if var.name.startswith(scope)]
class DDPG():
def __init__(self, env, config):
self.env = env
self.s_dim = self.env.observation_space.shape[0]
self.a_dim = self.env.action_space.shape[0]
self.a_high = self.env.action_space.high[0]
self.a_noise = self.a_high / 10
self.config = config
self.step = 0
self.replay_buffer = ReplayBuffer(
self.s_dim, self.a_dim, self.config.replay_size)
self.sess = tf.Session()
self.build()
def _create_placeholder(self, name, dim=None):
return tf.placeholder(tf.float32,
shape=(None, dim) if dim else (None,),
name=name)
def create_placeholders(self):
self.inputs = {}
dims = {'s': self.s_dim,
'next_s': self.s_dim,
'a': self.a_dim,
'r': None,
'd': None}
for key in dims:
self.inputs[key] = self._create_placeholder(key, dims[key])
def create_model(self):
self.agent = ActorCritic(self.config,
self.inputs['s'],
self.inputs['a'],
self.a_dim,
self.a_high,
'agent')
self.target = ActorCritic(self.config,
self.inputs['next_s'],
None,
self.a_dim,
self.a_high,
'target')
print('self.target.q_from_policy', self.target.q_from_policy)
estimated_q = self.inputs['r'] + self.config.discount * \
(1 - self.inputs['d']) * self.target.q_from_policy
y = tf.stop_gradient(estimated_q)
self.policy_loss = -tf.reduce_mean(self.agent.q_from_policy)
self.q_loss = tf.reduce_mean((y - self.agent.q) ** 2)
def optimize(self):
print(get_vars('agent/q'))
self.p_algo = tf.train.AdamOptimizer(
learning_rate=self.config.policy_lr)
self.policy_opt = self.p_algo.minimize(
self.policy_loss, var_list=get_vars('agent/policy'))
self.q_algo = tf.train.AdamOptimizer(learning_rate=self.config.q_lr)
self.q_opt = self.q_algo.minimize(self.q_loss,
var_list=get_vars('agent/q'))
self.target_update = tf.group([tf.assign(target_v, self.config.tau * agent_v + (1 - self.config.tau) * target_v) # noqa
for target_v, agent_v in zip(get_vars('target'), get_vars('agent'))]) # noqa
def create_summary(self):
self.writer = tf.summary.FileWriter('outputs/graphs',
tf.get_default_graph())
self.summary = tf.summary.merge((tf.summary.scalar('policy_loss',
self.policy_loss),
tf.summary.scalar('q_loss',
self.q_loss)))
def build(self):
self.create_placeholders()
self.create_model()
self.optimize()
self.create_summary()
def intialize_ddpg(self):
self.sess.run(tf.global_variables_initializer())
init_target_with_agent = [tf.assign(target_v, agent_v) for
target_v, agent_v in zip(get_vars('target'), get_vars('agent'))] # noqa
self.sess.run(init_target_with_agent)
def get_action(self, s, noise_scale):
a = self.sess.run(self.agent.policy,
feed_dict={self.inputs['s']: s.reshape(1, -1)})[0]
return np.clip(a + noise_scale * np.random.randn(self.a_dim),
-self.a_high,
self.a_high)
def train(self):
self.intialize_ddpg()
for i in range(self.config.n_epochs):
print('EPOCH', i)
s, r, d, ep_r, ep_len = self.env.reset(), 0, False, 0, 0
while (not d) and ep_len < self.config.max_ep_len:
a = self.get_action(s, self.a_noise)
next_s, r, d, _ = self.env.step(a)
ep_r += r
ep_len += 1
d = False if ep_len == self.config.max_ep_len else d
self.replay_buffer.add(s, a, r, next_s, d)
if (ep_len % 100) == 0:
print(ep_len, a)
print(self.sess.run(self.target.q_from_policy, feed_dict={
self.inputs['next_s']: s.reshape(1, -1)}).shape)
s = next_s
total_q_loss, total_p_loss = 0, 0
for _ in range(min(self.replay_buffer.size,
self.config.steps_per_epoch)):
b_s, b_a, b_next_s, b_r, b_d = self.replay_buffer.get_batch(
self.config.batch_size)
feed_dict = {self.inputs['s']: b_s,
self.inputs['a']: b_a,
self.inputs['r']: b_r,
self.inputs['next_s']: b_next_s,
self.inputs['d']: b_d}
# noqa tmp = [op for op in tf.get_default_graph().get_operations() if op.type == "Placeholder" and op.name=='next_s'][0]
q_loss, _, summaries = self.sess.run([self.q_loss,
self.q_opt,
self.summary],
feed_dict=feed_dict)
policy_loss, _, _ = self.sess.run([self.policy_loss,
self.policy_opt,
self.target_update],
feed_dict=feed_dict)
total_q_loss += q_loss
total_p_loss += policy_loss
self.step += 1
self.writer.add_summary(summaries, global_step=self.step)
print('policy_loss: %.2f. q_loss: %.2f' % (
total_p_loss / self.config.steps_per_epoch,
total_q_loss / self.config.steps_per_epoch))
def get_config():
tau = 0.005
discount = 0.99
hid_sizes = [400, 300]
activation = tf.nn.relu
output_activation = tf.tanh
policy_lr = 1e-4
q_lr = 1e-3
batch_size = 32
steps_per_epoch = 100
max_ep_len = 500
n_epochs = 10
replay_size = 10000
return AttrDict(locals())
def main():
config = get_config()
env = gym.make('MountainCarContinuous-v0')
algo = DDPG(env, config)
algo.train()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment