(17-6-03)基于强化学习的自动驾驶系统:强化学习代理
17.7.4 强化学习代理
创建Python文件reinforcement/sac_agent.py。
该文件负责开发一个名为SACAgent的代理类。
该代理基于Soft Actor-Critic算法实现了强化学习能力。
具体而言,在解决连续动作空间下的强化学习问题时,
该代理能够有效执行相应的任务逻辑。
实现一个名为init_weight的函数来处理神经网络参数的权重初始化工作。该函数能够根据输入指定的不同初始化策略进行操作,默认采用He正态化策略('he normal')。支持的具体方法包括Xavier均匀化 初始化('xavier uniform')和He正态化 初始化('he normal')。具体的代码实现见下文
def init_weight(layer, initializer="he normal"):
if initializer == "xavier uniform":
nn.init.xavier_uniform_(layer.weight)
elif initializer == "he normal":
nn.init.kaiming_normal_(layer.weight)
(2)构建ValueNetwork类来表示价值网络,并用于估算各状态对应的值函数。该价值网络由若干个线性层构成,在接收输入状态信息后会计算相应状态的价值评估。具体的代码实现见下文。
class ValueNetwork(nn.Module):
def __init__(self, obs_dim):
super(ValueNetwork, self).__init__()
self.fc1 = nn.Linear(obs_dim, 64)
init_weight(self.fc1)
self.fc2 = nn.Linear(64, 64)
init_weight(self.fc2)
self.out_left = nn.Linear(64, 1)
self.out_straight = nn.Linear(64, 1)
self.out_right = nn.Linear(64, 1)
init_weight(self.out_left, "xavier uniform")
init_weight(self.out_straight, "xavier uniform")
init_weight(self.out_right, "xavier uniform")
def forward(self, x, command):
x = self.fc1(x)
x = torch.relu(x)
x = self.fc2(x)
x = torch.relu(x)
out_left = self.out_left(x)
out_straight = self.out_straight(x)
out_right = self.out_right(x)
x = torch.stack((out_left, out_straight, out_right), dim=0)
x = torch.gather(x, 0, command.view(1,-1,1)).view(-1, 1)
return x
(3)该类QvalueNetwork被定义为一种用于估计Q值的神经网络。该网络接收状态、动作以及命令作为输入,并输出相应的Q值。该网络主要应用于深度Q学习(DQN)等强化学习算法中,并主要用于估算在特定状态下采取某一动作所对应的Q值。
class QvalueNetwork(nn.Module):
def __init__(self, obs_dim, nb_actions=2):
super(QvalueNetwork, self).__init__()
self.fc1 = nn.Linear(obs_dim+nb_actions, 64)
init_weight(self.fc1)
self.fc2 = nn.Linear(64, 64)
init_weight(self.fc2)
self.out_left = nn.Linear(64, 1)
self.out_straight = nn.Linear(64, 1)
self.out_right = nn.Linear(64, 1)
init_weight(self.out_left, "xavier uniform")
init_weight(self.out_straight, "xavier uniform")
init_weight(self.out_right, "xavier uniform")
def forward(self, x, command, a):
x = torch.cat([x, a], dim=1)
x = self.fc1(x)
x = torch.relu(x)
x = self.fc2(x)
x = torch.relu(x)
out_left = self.out_left(x)
out_straight = self.out_straight(x)
out_right = self.out_right(x)
x = torch.stack((out_left, out_straight, out_right), dim=0)
x = torch.gather(x, 0, command.view(1,-1,1)).view(-1, 1)
return x
对上述代码的具体说明如下:
- init 函数:初始化 QvalueNetwork 类的实例。它接受两个参数:obs_dim 表示状态空间的维度和 nb_actions 表示动作空间的维度(默认为 2,通常是左、直行和右三个动作)。在这个函数中,初始化了神经网络的各个层,包括两个全连接层,用于处理状态和动作的输入,并为每个动作维度创建了输出层。
- forward 函数:前向传播函数,用于计算 Q 值。它接受输入状态 x、命令 command 和动作 a,并使用神经网络的全连接层和输出层计算 Q 值。首先,将输入状态 x 和动作 a 连接起来,形成一个扩展的输入向量。然后,通过两个全连接层进行前向传播,其中包括激活函数 ReLU。最后,根据动作命令 command 选择相应的 Q 值输出,并将其返回。
- init_weight 函数:用于初始化神经网络层的权重。根据参数 initializer 的不同取值,可以使用不同的初始化方法来初始化权重。通常使用的方法有 Xavier 初始化和 He 初始化。
(4)通过创建一个名为PolicyNetwork的神经网络类来实现策略生成的目标。该神经网络不仅能够生成动作本身,还具备计算生成动作概率密度函数的能力,在确定策略梯度方面发挥了重要作用。为了提高模型性能,在动作分布模型的选择上采用了正态分布策略。其中__init__ 函数负责初始化 PolicyNetwork 类实例,并接受两个必要参数:obs_dim 表示状态空间维度的数量,默认设置为2(通常对应左、直行和右三个动作)。在初始化过程中构建了该神经网络的所有层结构,并引入了两个全连接层来处理状态输入信息。此外,为每个动作维度单独设置了均值(mu)和标准差(log_std)输出层以实现精准的动作控制。具体的代码实现如下所示:
class PolicyNetwork(nn.Module):
def __init__(self, obs_dim, nb_actions=2):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(obs_dim, 64)
init_weight(self.fc1)
self.fc2 = nn.Linear(64, 64)
init_weight(self.fc1)
self.mu_left = nn.Linear(64, nb_actions)
self.mu_straight = nn.Linear(64, nb_actions)
self.mu_right = nn.Linear(64, nb_actions)
init_weight(self.mu_left, "xavier uniform")
init_weight(self.mu_straight, "xavier uniform")
init_weight(self.mu_right, "xavier uniform")
self.log_std_left = nn.Linear(64, nb_actions)
self.log_std_straight = nn.Linear(64, nb_actions)
self.log_std_right = nn.Linear(64, nb_actions)
init_weight(self.log_std_left, "xavier uniform")
init_weight(self.log_std_straight, "xavier uniform")
init_weight(self.log_std_right, "xavier uniform")
self.nb_actions = nb_actions
定义前馈传播函数forward()函数
def forward(self, x, command):
x = self.fc1(x)
x = torch.relu(x)
x = self.fc2(x)
x = torch.relu(x)
mu_left = self.mu_left(x)
mu_straight = self.mu_straight(x)
mu_right = self.mu_right(x)
mu = torch.stack((mu_left, mu_straight, mu_right), dim=0)
mu = torch.gather(mu, 0, command.expand((-1,self.nb_actions)).view(1,-1,self.nb_actions)).view(-1,self.nb_actions)
log_std_left = self.log_std_left(x)
log_std_straight = self.log_std_straight(x)
log_std_right = self.log_std_right(x)
log_std = torch.stack((log_std_left, log_std_straight, log_std_right), dim=0)
log_std = torch.gather(log_std, 0, command.expand((-1,self.nb_actions)).view(1,-1,self.nb_actions)).view(-1,self.nb_actions)
std = log_std.clamp(min=-20, max=2).exp()
return mu, std
创建函数sample_or_likelihood用于基于当前策略生成动作或计算其概率密度函数(PDF)。该函数接收输入状态states以及命令command,并通过神经网络模型推导出动作的均值和标准差参数。随后从该概率分布中抽取一个样本动作,并计算该样本动作在该分布中的对数似然值log_prob。最后返回所抽取的动作样本及其对应的对数似然值log_prob。具体实现代码如下所示:
def sample_or_likelihood(self, states, command):
mu, std = self(states, command)
dist = torch.distributions.Normal(mu, std)
u = dist.rsample()
action = torch.tanh(u)
log_prob = dist.log_prob(value=u)
log_prob -= torch.log(1 - action ** 2 + 1e-6)
log_prob = log_prob.sum(-1, keepdim=True)
return action, log_prob
(7)创建类SACAgent以负责实施Soft Actor-Critic (SAC) 算法的强化学习代理。该代理包含关键组件如值网络、Q 值网络、策略网络以及经验回放缓冲区等。强化学习代理根据所给状态选择动作,并在之后执行训练流程以不断优化策略并提升价值函数估计。其中函数__init__用于初始化 SACAgent 类的实例参数设置。
- obs_dim: 表示状态空间维度数
- nb_actions: action_dim表示动作空间维度数
- device: device指定设备选择
- lr: learning_rate设定学习率参数
- alpha: alpha温度参数用于调节 SAC算法中的温度因子
- batch_size: batch_size设定训练阶段批次大小参数
- gamma: gamma值代表动态规划模型中的折扣因子设置
- tau: tau用于控制目标网络更新策略系数设置
- buffer_size: replay_buffer容量设定经验回放缓冲区存储容量参数
- action_clip: action_clip定义动作值缩放边界参数
- collision_percentage: collision Experience百分比指定从经验库中选取碰撞经验的比例设置
- sch_gamma和sch_steps分别设定学习率衰减因子和衰减频率参数
类SACAgent的具体实现代码如下所示。
class SACAgent:
def __init__(self, obs_dim=260, nb_actions=2, device='cpu', lr=1e-3, alpha=0.2,
batch_size=64, gamma=0.99, tau=0.005, buffer_size=10000, action_clip=(-1,1),
collision_percentage=0.2, sch_gamma = 0.9, sch_steps=500, reward_scale=1.0):
self.device = device
self.policy_network = PolicyNetwork(obs_dim, nb_actions).to(self.device)
self.q_value_network1 = QvalueNetwork(obs_dim, nb_actions).to(self.device)
self.q_value_network2 = QvalueNetwork(obs_dim, nb_actions).to(self.device)
self.value_network = ValueNetwork(obs_dim).to(self.device)
self.value_target_network = ValueNetwork(obs_dim).to(self.device)
self.hard_update()
self.value_opt = torch.optim.Adam(self.value_network.parameters(), lr=lr)
self.q_value1_opt = torch.optim.Adam(self.q_value_network1.parameters(), lr=lr)
self.q_value2_opt = torch.optim.Adam(self.q_value_network2.parameters(), lr=lr)
self.policy_opt = torch.optim.Adam(self.policy_network.parameters(), lr=lr)
self.sch_value = StepLR(self.value_opt, sch_steps, gamma=sch_gamma)
self.sch_q1 = StepLR(self.q_value1_opt, sch_steps, gamma=sch_gamma)
self.sch_q2 = StepLR(self.q_value2_opt, sch_steps, gamma=sch_gamma)
self.sch_policy = StepLR(self.policy_opt, sch_steps, gamma=sch_gamma)
self.gamma = gamma
self.tau = tau
self.alpha = alpha
self.action_clip = action_clip
self.reward_scale = reward_scale
self.collision_percentage = collision_percentage
self.batch_size = batch_size
self.buffer = ReplayBuffer(buffer_size)
self.buffer_collision = ReplayBuffer(buffer_size)
#Training variables
self.tr_step = 0
self.tr_steps_vec = []
self.avg_reward_vec = []
self.std_reward_vec = []
self.success_rate_vec = []
self.episode_nb = 0
(8)定义如下所示的5个函数:
- hard_update函数:使目标网络的参数直接复制至当前网络。
- soft_update函数:通过系数 tau 调节更新速率来实现目标网络参数与当前网络之间的软更新。
- reset_noise函数:负责归零噪声。
- store_transition函数:会把这一系列操作记录下来。
- store_transition_collision函数:当发生碰撞时的数据也会被记录下来。
具体实现代码如下所示。
def hard_update(self):
self.value_target_network.load_state_dict(self.value_network.state_dict())
def soft_update(self):
for param, target_param in zip(self.value_network.parameters(), self.value_target_network.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def reset_noise(self):
pass
def store_transition(self, obs, action, reward, next_obs, done, info=None):
self.buffer.add((obs, action, reward, next_obs, done))
def store_transition_collision(self, obs, action, reward, next_obs, done, info=None):
self.buffer_collision.add((obs, action, reward, next_obs, done))
(9)创建一个名为select_actio的功能。该功能将基于当前状态来决定采取何种行动,并返回所选的动作。它的实现会接受状态作为输入参数,并提供可选地包含噪声的选择选项。具体的代码实现见下文。
def select_action(self, obs, noise=True):
obs = torch.from_numpy(obs).to(self.device).unsqueeze(0)
comm = obs[:, -1:].long()
obs = obs[:, :-1]
with torch.no_grad():
if noise:
action, _ = self.policy_network.sample_or_likelihood(obs, comm)
else:
action, _ = self.policy_network(obs, comm)
action = action.cpu().data.numpy().flatten()
return np.clip(action, *self.action_clip)
(10)创建名为get_batch的功能模块,在经验回放缓冲区中提取一批具有代表性的样本数据,并对其中的状态信息、动作选择以及相应的奖励信号进行分析处理;具体实现细节可见于附录部分
def get_batch(self):
col_batch_size = int(self.collision_percentage*self.batch_size)
if col_batch_size < len(self.buffer_collision):
batch_size = self.batch_size - col_batch_size
states_col, actions_col, rewards_col, next_states_col, dones_col = self.buffer_collision.sample(col_batch_size)
states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
states = np.concatenate((states_col, states))
actions = np.concatenate((actions_col, actions))
rewards = np.concatenate((rewards_col, rewards))
next_states = np.concatenate((next_states_col, next_states))
dones = np.concatenate((dones_col, dones))
else:
states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
return states, actions, rewards, next_states, dones
(11)创建函数update用于执行代理的训练流程,请注意该流程将负责以下几项任务:一是更新价值网络、Q值网络、策略网络以及目标价值网络;二是完成相应的优化步骤。具体实现代码如下所示。
def update(self, soft_update=True):
states, actions, rewards, next_states, dones = self.get_batch()
comms = states[:, -1:]
states = states[:, :-1]
next_comms = next_states[:, -1:]
next_states = next_states[:, :-1]
states = torch.from_numpy(states).float().to(self.device)
comms = torch.from_numpy(comms).long().to(self.device)
actions = torch.from_numpy(actions).float().to(self.device)
rewards = torch.from_numpy(rewards).float().to(self.device)
next_states = torch.from_numpy(next_states).float().to(self.device)
next_comms = torch.from_numpy(next_comms).long().to(self.device)
dones = torch.from_numpy(dones.astype(np.uint8)).float().to(self.device)
reparam_actions, log_probs = self.policy_network.sample_or_likelihood(states, comms)
q1 = self.q_value_network1(states, comms, reparam_actions)
q2 = self.q_value_network2(states, comms, reparam_actions)
q = torch.min(q1, q2)
target_value = q.detach() - self.alpha * log_probs.detach()
value = self.value_network(states, comms)
value_loss = F.mse_loss(value, target_value)
with torch.no_grad():
target_q = self.reward_scale*rewards + self.gamma * self.value_target_network(next_states, next_comms) * (1 - dones)
q1 = self.q_value_network1(states, comms, actions)
q2 = self.q_value_network2(states, comms, actions)
q1_loss = F.mse_loss(q1, target_q)
q2_loss = F.mse_loss(q2, target_q)
policy_loss = (self.alpha * log_probs - q).mean()
self.policy_opt.zero_grad()
policy_loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy_network.parameters(), 5e-3)
self.policy_opt.step()
self.value_opt.zero_grad()
value_loss.backward()
# torch.nn.utils.clip_grad_norm_(self.value_network.parameters(), 5e-3)
self.value_opt.step()
self.q_value1_opt.zero_grad()
q1_loss.backward()
# torch.nn.utils.clip_grad_norm_(self.q_value_network1.parameters(), 5e-3)
self.q_value1_opt.step()
self.q_value2_opt.zero_grad()
q2_loss.backward()
# torch.nn.utils.clip_grad_norm_(self.q_value_network2.parameters(), 5e-3)
self.q_value2_opt.step()
if soft_update:
self.soft_update()
else:
self.hard_update()
self.sch_value.step()
self.sch_q1.step()
self.sch_q2.step()
self.sch_policy.step()
(12)实现了名为save的功能用于持久化存储代理模型的参数;并创建了一个特殊版本的功能 save_actor 专门负责存储策略网络的具体参数;同时实现了 load_actor 功能用于恢复策略网络的模型参数。具体代码实现如下所示:
def save(self, save_path):
with open(save_path, 'wb') as f:
pickle.dump(self, f)
def save_actor(self, save_path):
torch.save(self.policy_network.state_dict(), save_path)
def load_actor(self, load_path):
self.policy_network.load_state_dict(torch.load(load_path))
