【强化学习】Actor-Critic
Actor-Critic算法
欢迎访问Blog全部目录!
文章目录
- Actor-Critic算法
-
-
1.Actor-Critic原理
-
- 1.1.简述
- 1.1.优劣势
- 1.3.策略网络和价值网络
-
- 1.3.1.策略网络(Actor)
-
1.3.2.价值网络(Critic)
- 1.4.程序框图和伪代码
-
2.算法案例:Pendulum-v1
-
- 2.1.理论
-
- 2.1.1.环境
-
2.1.2.状态
-
2.1.3.动作
-
2.1.4.奖励
-
2.1.5.开始与终止条件
-
2.1.6.策略网络(Actor)
-
2.1.7.价值网络(Critic)
-
2.1.8.Actor-Critic网络框图
- 2.2.代码
-
- 2.2.1.结果
-
2.2.2.code
-
3.算法案例:CartPole-v1
-
- 3.1.理论
-
- 3.1.1.环境状态动作奖励等
-
3.1.2.策略网络(Actor)
-
3.1.3.价值网络(Critic)
-
3.1.4.Actor-Critic网络框图
- 3.2.代码
-
- 3.2.1.结果
-
3.2.2.Code
-
1.Actor-Critic原理
参考论文:
Actor-Critic Algorithms
1.1.简述
Actor-Critic结合了基于值函数的方法和基于策略的方法。
Actor:
- Actor的前身为基于策略的方法(Policy Gradient)。
- Actor使其适用于连续动作 ,弥补了值函数方法受动作空间的限制。
- Actor为表演家,依据Critic的评价修改动作选取的概率。
Critic:
- Critic的前身为基于值函数的方法(Function Approximation, 如Q-learning)。
- Critic使其允许单步更新 ,弥补了策略方法回合制更新效率低的问题。
- Critic为评论家,评价Actor选取动作的优劣。
1.1.优劣势
优势:
- 适合连续动作,单步更新
劣势:
- 两个神经网络均在连续状态下更新参数,参数更新前后均存在相关性,导致神经网络学习效果差,难收敛。
(为解决此问题,谷歌引入了DDPG=Actor-Critic+DQN,将2个网络拓展到4个网络。)
1.3.策略网络和价值网络
1.3.1.策略网络(Actor)
更新: 以策略梯度的方法更新

损失函数:
G = r_t+\gamma V^{\pi_{\theta}}(S_{t+1})-V^{\pi_{\theta}}(S_{t})\\ loss(t)=- G*log\pi_{\theta}(a_t|s_t)\\ 其中,V^{\pi_{\theta}}(S_{t})为状态价值函数。
1.3.2.价值网络(Critic)
采取时序差分残差的学习方式
损失函数:
loss(t)=\frac{1}{2}(r+\gamma V_w(s_{t+1})-V_w(s_t))^2
1.4.程序框图和伪代码


2.算法案例:Pendulum-v1
Pendulum是连续状态和连续动作空间的案例!!
2.1.理论
源码:gym/gym/envs/classic_control/pendulum.py at master · openai/gym · GitHub
2.1.1.环境
Pendulum - Gymnasium Documentation (farama.org)
倒立摆摆问题是基于控制理论中的经典问题。该系统由一端连接到固定点的钟摆组成,另一端自由。钟摆以随机位置开始 ,目标是在自由端施加扭矩以将其摆动到直立位置,重心正好在固定点上方。

2.1.2.状态
智能体状态为倒立摆位置状态(shape(3,)),包括其正弦值sin\theta、余弦值cos\theta和角速度\dot{\theta}。

2.1.3.动作
智能体动作为shape为(1,)的施加在自由端的扭矩a,逆时钟为正方向。

2.1.4.奖励
奖励函数为
r=-(\theta^2+0.1\dot{\theta^2}+0.001a^2)
倒立摆向上保持直立不动(角速度为0,扭矩为0)时奖励为 0,倒立摆在其他位置时奖励为负数。奖励范围为[-16,0],此处令
r = (r+8)/8
将奖励归一化到[-1,1],有利于收敛!!!
2.1.5.开始与终止条件
开始:钟摆以随机位置开始,起始状态是[-\pi, \pi] 中的随机角度\theta和 [-1,1] 中的随机角速度\dot{\theta}。
终止:

注意:在目前V1版本的源码中,无论episode长度是否大于200,其Truncation输出均为False,没有官网API说的时间截断功能。

2.1.6.策略网络(Actor)
假设Critic价值网络表示为V_w,参数为w
假设Actor策略网络表示为V^\pi,参数为\pi
损失函数:
时序差分残差TD-error\\ G = r_t+\gamma V_w(S_{t+1})-V_w(S_{t}) \\ loss(t)=- \frac {1}{N_a}\sum_i^{N_a}G*log[f(a_t|s_t)]\\ f(x)为正态分布的概率密度函数:\\ f(x)=\frac{1}{\sqrt{2\pi}\sigma}exp(-\frac{(x-\mu)^2}{2\sigma^2})
网络结构:
策略网络输入当前状态,输出连续动作的正态分布的均值与标准差。

2.1.7.价值网络(Critic)
损失函数:
loss(t) = \frac {1}{N_a}\sum_i^{N_a}(\frac{1}{2}(G_t-V_w(S_{t}))^2)\\ ==\frac{1}{N_a}\sum_i^{N_a}(\frac{1}{2}(r_t+\gamma V_w(S_{t+1})-V_w(S_{t}))^2)\\
**最小化损失函数:**梯度下降法
网络结构:
价值网络输入当前状态,输出状态价值

2.1.8.Actor-Critic网络框图

2.2.代码
2.2.1.结果

说明: ActorCritic算法在500个episode内并没有收敛趋势,不知道是代码写错了还是训练时间不过(欢迎看到这的大家批评指正)。
2.2.2.code
>ActorCritic_Pendulum.py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# Actor 表演家
class PolicyNet(nn.Module):
def __init__(self, n_feature, n_hidden, n_output):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(n_feature, n_hidden)
self.fc1.weight.data.normal_(0, 0.1) # 初始化网络权重
self.fc_mu = nn.Linear(n_hidden, n_output) # 高斯分布均值
self.fc_mu.weight.data.normal_(0, 0.1) # 初始化网络权重
self.fc_std = nn.Linear(n_hidden, n_output) # 高斯分布标准差
self.fc_std.weight.data.normal_(0, 0.1) # 初始化网络权重
def forward(self, x):
x = F.relu(self.fc1(x))
# 输出该状态下的动作概率分布高斯分布的均值和标准差
mu = 2.0 * torch.tanh(self.fc_mu(x)) # mu的范围为【-2,2】
std = F.softplus(self.fc_std(x)) # softplus为激活函数
return mu, std
#Critic 评论家
class ValueNet(nn.Module):
def __init__(self, n_feature, n_hidden, n_output):
super(ValueNet, self).__init__()
self.fc1 = nn.Linear(n_feature, n_hidden)
self.fc1.weight.data.normal_(0, 0.1) # 初始化网络权重
self.out = nn.Linear(n_hidden, n_output)
self.out.weight.data.normal_(0, 0.1) # 初始化网络权重
def forward(self, x):
x = F.relu(self.fc1(x))
return self.out(x)
class ActorCritic:
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device):
self.actor = PolicyNet(n_feature=state_dim, n_hidden=hidden_dim, n_output=action_dim).to(device)
self.critic = ValueNet(n_feature=state_dim, n_hidden=hidden_dim, n_output=1).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # 价值网络优化器
self.gamma = gamma
self.device = device
# # 预加载模型
# self.actor.load_state_dict(torch.load('./model_parameter/actor_params_epoch_0_reward_5.28.pkl'))
# self.critic.load_state_dict(torch.load('./model_parameter/critic_params_epoch_0_reward_5.28.pkl'))
# 选动作:根据当前状态的动作概率分布采样
def choose_action(self, state):
state = torch.tensor(np.array([state]), dtype=torch.float).to(self.device)
mu, std = self.actor(state)
action_dist = torch.distributions.Normal(mu, std) # 创造可采用的正态分布N(mu, sigma)
action = action_dist.sample()
# 取出张量中的数值
return [action.item()]
def learn(self, s, a, r, s_, done):
states = torch.tensor(np.array([s]),
dtype=torch.float).to(self.device) # state放入gpu
actions = torch.tensor([a]).view(-1, 1).to(self.device)
rewards = r
next_states = torch.tensor(np.array([s_]),
dtype=torch.float).to(self.device) # state放入gpu
dones = 1 if done == True else 0
# 策略网络
# 时序差分
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
td_delta = td_target - self.critic(states)
mu, std = self.actor(states)
action_dists = torch.distributions.Normal(mu.detach(), std.detach()) # 正态分布概率密度函数
log_prob = action_dists.log_prob(actions) # 十分重要 使用当前state的action概率计算 数据点x在正态分布中的对数概率密度函数的值
#
actor_loss = torch.mean(-log_prob * td_delta.detach()) #.detach()表示其不参与反向传播
step_actor_loss = actor_loss.cpu().item()
# 价值网络
# 均方误差损失函数
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach())) #求时序差分残差均方误差
step_critic_loss = critic_loss.cpu().item()
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.requires_grad_(True) # 使其可梯度求导
critic_loss.requires_grad_(True)
actor_loss.backward() # 计算策略网络的梯度
critic_loss.backward() # 计算价值网络的梯度
self.actor_optimizer.step() # 更新策略网络的参数
self.critic_optimizer.step() # 更新价值网络的参数
return step_actor_loss, step_critic_loss
def save_model(self, i_episode, reward_episode):
torch.save(self.actor.state_dict(), './model_parameter/'+'actor_params_epoch_'+str(i_episode)+'_reward_'+str(reward_episode)+'.pkl')
torch.save(self.critic.state_dict(), './model_parameter/'+'critic_params_epoch_'+str(i_episode)+'_reward_'+str(reward_episode)+'.pkl')
python

>Main_Pendulum.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from tqdm import tqdm
from ActorCritic_Pendulum import ActorCritic
# 超参数
ACTOR_LR = 1e-4 # learning rate
CRITIC_LR = 5e-3
GAMMA = 0.9 # 奖励递减参数
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
"cpu")
# 输出环境
env = gym.make('Pendulum-v1', render_mode="human")
env = env.unwrapped # 环境重置
# env.action_space.seed(123)
torch.manual_seed(0)
N_ACTIONS = env.action_space.shape[0] # 杆子能做的连续动作 1
N_STATES = env.observation_space.shape[0] # 杆子能获取的环境信息数 3
N_NEURON = 128 # 单层神经元个数
agent = ActorCritic(N_STATES, N_NEURON, N_ACTIONS, ACTOR_LR, CRITIC_LR, GAMMA, device)
r_list = []
with tqdm(total=int(500)) as pbar:
for i_episode in range(500):
episode_r = 0
episode_actor_loss = []
episode_critic_loss = []
i_step = 0
s = env.reset() # 环境重置
s = s[0]
done = False
while True:
env.render() # 显示实验动画
i_step = i_step + 1
a = agent.choose_action(s)
# 选动作, 得到环境反馈
s_, r, Termination, _, info = env.step(a)
# 修正奖励
r = (r + 8.0) / 8.0
episode_r += r
# 如果回合结束,进入下回合
if Termination | (i_step >= int(200)): # 为缩短时间,将其最大步长改为200步
done = True
break
step_actor_loss, step_critic_loss = agent.learn(s, a, r, s_, done)
episode_actor_loss.append(step_actor_loss)
episode_critic_loss.append(step_critic_loss)
s = s_
r_list.append(episode_r)
pbar.desc = "epoch %d " % (i_episode)
pbar.set_postfix({
'avg_actor_loss':
'%0.3f' % np.mean(episode_actor_loss),
'avg_critic_loss':
'%0.3f' % np.mean(episode_critic_loss),
'return':
'%0.3f' % episode_r,
'epoch_step': # 每个epoch的步数step
'%d' % i_step
})
pbar.update(1)
episodes_list = list(range(len(r_list)))
plt.plot(episodes_list, r_list)
plt.xlabel('Episodes')
plt.ylabel('AvgReturns')
plt.title(' REINFORCE on {}'.format('CartPole-v1'))
plt.show()
python

3.算法案例:CartPole-v1
CartPole是连续状态和离散动作空间的案例!!
3.1.理论
3.1.1.环境状态动作奖励等
请看DQN小节的介绍!
3.1.2.策略网络(Actor)
假设Critic价值网络表示为V_w,参数为w
假设Actor策略网络表示为V^\pi,参数为\pi
损失函数:
时序差分残差TD-error\\ G = r_t+\gamma V_w(S_{t+1})-V_w(S_{t}) \\ loss(t)=- \frac {1}{N_a}\sum_i^{N_a}G*log\pi_{\theta}(a_t|s_t)
网络结构:
策略网络输入当前状态,输出离散动作的概率分布。

3.1.3.价值网络(Critic)
损失函数:
loss(t) = \frac {1}{N_a}\sum_i^{N_a}(\frac{1}{2}(G_t-V_w(S_{t}))^2)\\ ==\frac{1}{N_a}\sum_i^{N_a}(\frac{1}{2}(r_t+\gamma V_w(S_{t+1})-V_w(S_{t}))^2)\\
最小化损失函数: 梯度下降法
网络结构:
价值网络输入当前状态,输出状态价值

3.1.4.Actor-Critic网络框图

3.2.代码
3.2.1.结果

3.2.2.Code
>ActorCritic_CartPole.py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# Actor 表演家
class PolicyNet(nn.Module):
def __init__(self, n_feature, n_hidden, n_output):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(n_feature, n_hidden)
self.fc1.weight.data.normal_(0, 0.1) # 初始化网络权重
self.out = nn.Linear(n_hidden, n_output)
self.out.weight.data.normal_(0, 0.1) # 初始化网络权重
def forward(self, x):
x = F.relu(self.fc1(x))
# 输出该状态下的动作概率分布(softmax函数转换)
actions_pro_distributions = F.softmax(self.out(x), dim=1)
return actions_pro_distributions
#Critic 评论家
class ValueNet(nn.Module):
def __init__(self, n_feature, n_hidden, n_output):
super(ValueNet, self).__init__()
self.fc1 = nn.Linear(n_feature, n_hidden)
self.fc1.weight.data.normal_(0, 0.1) # 初始化网络权重
self.out = nn.Linear(n_hidden, n_output)
self.out.weight.data.normal_(0, 0.1) # 初始化网络权重
def forward(self, x):
x = F.relu(self.fc1(x))
return self.out(x)
class ActorCritic:
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, gamma, device):
self.actor = PolicyNet(n_feature=state_dim, n_hidden=hidden_dim, n_output=action_dim).to(device)
self.critic = ValueNet(n_feature=state_dim, n_hidden=hidden_dim, n_output=1).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # 价值网络优化器
self.gamma = gamma
self.device = device
# # 预加载模型
# self.actor.load_state_dict(torch.load('./model_parameter/actor_params_epoch_0_reward_5.28.pkl'))
# self.critic.load_state_dict(torch.load('./model_parameter/critic_params_epoch_0_reward_5.28.pkl'))
# 选动作:根据当前状态的动作概率分布采样
def choose_action(self, state):
state = torch.tensor(np.array([state]), dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs) # 创造可采样的概率分布并依次编号
action = action_dist.sample()
# 取出张量中的数值
return action.item()
def learn(self, s, a, r, s_, done):
states = torch.tensor(np.array([s]),
dtype=torch.float).to(self.device) # state放入gpu
actions = torch.tensor([a]).view(-1, 1).to(self.device)
rewards = r
next_states = torch.tensor(np.array([s_]),
dtype=torch.float).to(self.device) # state放入gpu
dones = 1 if done == True else 0
# 策略网络
# 时序差分
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
td_delta = td_target - self.critic(states)
log_prob = torch.log(self.actor(states).gather(1, actions)) # 十分重要 使用当前state的action概率计算
#
actor_loss = torch.mean(-log_prob * td_delta.detach()) #.detach()表示其不参与反向传播
step_actor_loss = actor_loss.cpu().item()
# 价值网络
# 均方误差损失函数
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach())) #求时序差分残差均方误差
step_critic_loss = critic_loss.cpu().item()
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward() # 计算策略网络的梯度
critic_loss.backward() # 计算价值网络的梯度
self.actor_optimizer.step() # 更新策略网络的参数
self.critic_optimizer.step() # 更新价值网络的参数
return step_actor_loss, step_critic_loss
def save_model(self, i_episode, reward_episode):
torch.save(self.actor.state_dict(), './model_parameter/'+'actor_params_epoch_'+str(i_episode)+'_reward_'+str(reward_episode)+'.pkl')
torch.save(self.critic.state_dict(), './model_parameter/'+'critic_params_epoch_'+str(i_episode)+'_reward_'+str(reward_episode)+'.pkl')
python

>Main_CartPole.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from tqdm import tqdm
from ActorCritic_Cartpole import ActorCritic
# 超参数
ACTOR_LR = 0.001 # learning rate
CRITIC_LR = 0.01
GAMMA = 0.9 # 奖励递减参数
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
"cpu")
# 输出环境
env = gym.make('CartPole-v1', render_mode="human")
env = env.unwrapped # 环境重置
# env.action_space.seed(123)
torch.manual_seed(0)
N_ACTIONS = env.action_space.n # 杆子能做的动作
N_STATES = env.observation_space.shape[0] # 杆子能获取的环境信息数
N_NEURON = 128 # 单层神经元个数
agent = ActorCritic(N_STATES, N_NEURON, N_ACTIONS, ACTOR_LR, CRITIC_LR, GAMMA, device)
r_list = []
with tqdm(total=int(500)) as pbar:
for i_episode in range(500):
episode_r = 0
episode_actor_loss = []
episode_critic_loss = []
i_step = 0
s = env.reset() # 环境重置
s = s[0]
done = False
while True:
env.render() # 显示实验动画
i_step = i_step + 1
a = agent.choose_action(s)
# 选动作, 得到环境反馈
s_, r, Termination, _, info = env.step(a)
# 修改 reward, 使 DQN 快速学习 杆子越偏,车位置越偏,奖励越小
x, x_dot, theta, theta_dot = s_
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
r = r1 + r2 # 每一步的奖励不超过0.7,最多500步
episode_r += r
# 如果回合结束,进入下回合
if Termination | (i_step >= int(200)): # 为缩短时间,将其最大步长改为200步
done = True
break
step_actor_loss, step_critic_loss = agent.learn(s, a, r, s_, done)
episode_actor_loss.append(step_actor_loss)
episode_critic_loss.append(step_critic_loss)
s = s_
# # 模型保存
# if i_episode % 100 == 0:
# agent.save_model(i_episode, round(episode_r, 2))
r_list.append(episode_r)
pbar.desc = "epoch %d " % (i_episode)
pbar.set_postfix({
'avg_actor_loss':
'%0.3f' % np.mean(episode_actor_loss),
'avg_critic_loss':
'%0.3f' % np.mean(episode_critic_loss),
'return':
'%0.3f' % episode_r,
'epoch_step': # 每个epoch的步数step
'%d' % i_step
})
pbar.update(1)
# episodes_list = list(range(len(r_list)))
# plt.plot(episodes_list, r_list)
# plt.xlabel('Episodes')
# plt.ylabel('AvgReturns')
# plt.title(' REINFORCE on {}'.format('CartPole-v1'))
# plt.show()
python

