使用DDPG算法在Python中实现深度强化学习模型优化
引言
DDPG算法概述
- 演员-评论家架构:演员网络生成动作,评论家网络评估动作价值。
- 目标网络:使用目标网络来稳定训练过程。
- 经验回放:通过经验回放机制减少数据相关性。
实现步骤
1. 环境准备
首先,确保安装了必要的Python库,如TensorFlow、gym等。
!pip install tensorflow gym
2. 导入库
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import gym
import random
from collections import deque
3. 定义DDPG模型
class Actor:
def __init__(self, state_dim, action_dim, action_bound):
self.state_dim = state_dim
self.action_dim = action_dim
self.action_bound = action_bound
self.model = self.create_model()
self.opt = tf.keras.optimizers.Adam(learning_rate=0.001)
def create_model(self):
return tf.keras.Sequential([
layers.Input((self.state_dim,)),
layers.Dense(256, activation='relu'),
layers.Dense(256, activation='relu'),
layers.Dense(self.action_dim, activation='tanh'),
layers.Lambda(lambda x: x * self.action_bound)
])
def train(self, states, q_grads):
with tf.GradientTape() as tape:
actions = self.model(states)
loss = tf.reduce_mean(-actions * q_grads)
grads = tape.gradient(loss, self.model.trainable_variables)
self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
def predict(self, state):
return self.model.predict(state)
class Critic:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.model = self.create_model()
self.opt = tf.keras.optimizers.Adam(learning_rate=0.002)
def create_model(self):
state_input = layers.Input((self.state_dim,))
state_out = layers.Dense(16, activation='relu')(state_input)
state_out = layers.Dense(32, activation='relu')(state_out)
action_input = layers.Input((self.action_dim,))
action_out = layers.Dense(32, activation='relu')(action_input)
concat = layers.Concatenate()([state_out, action_out])
out = layers.Dense(256, activation='relu')(concat)
out = layers.Dense(256, activation='relu')(out)
outputs = layers.Dense(1)(out)
return tf.keras.Model([state_input, action_input], outputs)
def predict(self, state, action):
return self.model.predict([state, action])
def train(self, state, action, reward, next_state, done):
target_actions = target_actor.predict(next_state)
future_rewards = target_critic.predict([next_state, target_actions])
updated_q_values = reward + gamma * future_rewards * (1 - done)
with tf.GradientTape() as tape:
q_values = self.model([state, action])
loss = tf.keras.losses.MeanSquaredError()(updated_q_values, q_values)
grads = tape.gradient(loss, self.model.trainable_variables)
self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
4. 定义目标网络
class TargetNetwork:
def __init__(self, model, tau):
self.model = model
self.tau = tau
def update_target(self, main_model):
main_weights = main_model.get_weights()
target_weights = self.model.get_weights()
for i in range(len(main_weights)):
target_weights[i] = self.tau * main_weights[i] + (1 - self.tau) * target_weights[i]
self.model.set_weights(target_weights)
5. 定义训练过程
def train_ddpg(actor, critic, target_actor, target_critic, buffer, batch_size, gamma):
if len(buffer) < batch_size:
return
samples = random.sample(buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*samples)
states = np.array(states)
actions = np.array(actions)
rewards = np.array(rewards)
next_states = np.array(next_states)
dones = np.array(dones)
target_actions = target_actor.predict(next_states)
future_rewards = target_critic.predict([next_states, target_actions])
updated_q_values = rewards + gamma * future_rewards * (1 - dones)
with tf.GradientTape() as tape:
q_values = critic.model([states, actions])
critic_loss = tf.keras.losses.MeanSquaredError()(updated_q_values, q_values)
critic_grads = tape.gradient(critic_loss, critic.model.trainable_variables)
critic.opt.apply_gradients(zip(critic_grads, critic.model.trainable_variables))
with tf.GradientTape() as tape:
actions = actor.model(states)
q_values = critic.model([states, actions])
actor_loss = -tf.reduce_mean(q_values)
actor_grads = tape.gradient(actor_loss, actor.model.trainable_variables)
actor.opt.apply_gradients(zip(actor_grads, actor.model.trainable_variables))
target_critic.update_target(critic.model)
target_actor.update_target(actor.model)
6. 主训练循环
env = gym.make('Pendulum-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high[0]
actor = Actor(state_dim, action_dim, action_bound)
critic = Critic(state_dim, action_dim)
target_actor = Actor(state_dim, action_dim, action_bound)
target_critic = Critic(state_dim, action_dim)
target_actor.model.set_weights(actor.model.get_weights())
target_critic.model.set_weights(critic.model.get_weights())
buffer = deque(maxlen=100000)
batch_size = 64
gamma = 0.99
tau = 0.001
episodes = 1000
for episode in range(episodes):
state = env.reset()
state = np.reshape(state, [1, state_dim])
done = False
total_reward = 0
while not done:
action = actor.predict(state)[0]
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_dim])
buffer.append((state, action, reward, next_state, done))
state = next_state
total_reward += reward
train_ddpg(actor, critic, target_actor, target_critic, buffer, batch_size, gamma)
print(f'Episode: {episode}, Total Reward: {total_reward}')
总结
希望这篇文章能帮助你更好地理解和应用DDPG算法,进一步探索深度强化学习的广阔领域。