Advertisement

(17-6-02)基于强化学习的自动驾驶系统:深度强化学习智能体

阅读量:

17.7.3 深度强化学习智能体

在项目目录reinforcement中创建文件agent.py,旨在实现一种深度强化学习智能体。该智能体采用演员-评论家架构(Actor-Critic)设计,并配备Twin Delayed Deep Deterministic Policy Gradients(TD3)算法以执行训练任务。其核心组件包括演员模块、评论员模块以及环境模拟器,并通过预先定义的训练与更新机制完成系统操作。具体实施步骤如下所述。

(1)创建名为OUNoise的类,并实现一个Ornstein-Uhlenbeck过程(一种常用于模拟物理系统中具有持续随机性的噪声)。该过程最初被用来描述气体分子在液体中的扩散行为,在之后的时间里广泛应用于金融、控制系统以及深度强化学习等多个领域。为了促进探索性行为,在类OUNoise中包含了以下功能:

  1. 设置生成器参数以实现指定的噪声特性。
  2. 将生成器的状态重新设置为指定的均值参数。
  3. 更新生成器的状态并返回当前噪声样本。

类OUNoise的具体实现代码如下所示。

复制代码
 class OUNoise:

    
     def __init__(self, size, mu=0., theta=0.6, sigma=0.2):
    
     """# 初始化智能体参数和模型"""
    
     self.mu = mu * np.ones(size)
    
     self.theta = theta
    
     self.sigma = sigma
    
     self.reset()
    
  
    
     def reset(self):
    
     """Reset the internal state (= noise) to mean (mu)."""
    
     self.state = copy.copy(self.mu)
    
  
    
     def sample(self):
    
     x = self.state
    
     dx = self.theta * (self.mu - x) + self.sigma * np.array([np.random.randn() for i in range(len(x))])
    
     self.state = x + dx
    
     return self.state

(2)开发StepLR类的功能目标在于自定义实现学习率调度器,并用于调节优化器的学习率参数。该类包含以下功能模块:

  1. init():初始化学习率调度器的参数。
  2. get_lr(self):获取每个参数组的学习率。

类StepLR的具体实现代码如下所示。

复制代码
 class StepLR(torch.optim.lr_scheduler._LRScheduler):

    
     def __init__(self, optimizer, step_size, gamma=0.9, last_epoch=-1, min_lr=1e-6, verbose=False):
    
     self.step_size = step_size
    
     self.gamma = gamma
    
     self.min_lr = min_lr
    
     super().__init__(optimizer, last_epoch, verbose)
    
  
    
     def get_lr(self):
    
     if (self.last_epoch == 0) or (self.last_epoch % self.step_size != 0):
    
         return [group['lr'] for group in self.optimizer.param_groups]
    
     return [max(self.min_lr, group['lr'] * self.gamma)
    
             for group in self.optimizer.param_groups]

开发一个名为TD3ColDeductiveAgent的类

  1. init(self, ...):智能体参数与模型的初始化过程。
  2. select_action(self, obs, prev_action, eval=False):基于观测、上一动作及是否评估的状态下执行动作选择。
  3. reset_noise(self):归零动作噪声状态。
  4. store_transition(self, p_act, obs, act, rew, next_obs, done):记录经验元组。
  5. update_step(self, it, is_pretraining):执行智能体模型更新步骤。
  6. _compute_bc_loss(self, obs, act, p_act):计算行为克隆损失指标。
  7. _compute_critic_loss(self, obs, act, rew, next_obs, done):计算评论家损失值。
  8. _compute_actor_loss(self, obs, p_act):评估演员损失水平。
  9. _compute_env_loss(self, obs, p_act):衡量环境响应损失情况。
  10. _update_env_model(self, obs, act ,rew ,next_obs):更新环境模型参数。
  11. change_opt_lr(self,actor_lr,critic_lr):调整优化器的学习率设置。
  12. load_exp_buffer(self,data) :导入经验缓冲区数据集.
  13. save(self(save_path)) :保存智能体模型参数.

类TD3ColDeductiveAgent的具体实现代码如下所示。

复制代码
 class TD3ColDeductiveAgent:

    
     def __init__(self, obs_size=256, device='cpu', actor_lr=1e-3, critic_lr=1e-3,
    
              pol_freq_update=2, policy_noise=0.2, noise_clip=0.5, act_noise=0.1, gamma=0.99,
    
              tau=0.005, l2_reg=1e-5, env_steps=8, env_w=0.2, lambda_bc=0.1, lambda_a=0.9, lambda_q=1.0,
    
              exp_buff_size=20000, actor_buffer_size=20000, exp_prop=0.25, batch_size=64,
    
              scheduler_step_size=350, scheduler_gamma=0.9):
    
     assert device in ['cpu', 'cuda'], "device must be either 'cpu' or 'cuda'"
    
  
    
     self.actor = Actor(obs_size).to(device)
    
     self.actor_target = Actor(obs_size).to(device)
    
     self.actor_target.load_state_dict(self.actor.state_dict())
    
     self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr, weight_decay=l2_reg)
    
     self.actor_scheduler = StepLR(self.actor_optimizer,
    
                                     step_size=scheduler_step_size,
    
                                     gamma=scheduler_gamma)
    
  
    
     self.critic = TwinCritic(obs_size).to(device)
    
     self.critic_target = TwinCritic(obs_size).to(device)
    
     self.critic_target.load_state_dict(self.critic.state_dict())
    
     self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr, weight_decay=l2_reg)
    
     self.critic_scheduler = StepLR(self.critic_optimizer,
    
                                     step_size=scheduler_step_size,
    
                                     gamma=scheduler_gamma)
    
  
    
     self.env_model = Environment(obs_size).to(device)
    
     self.env_model_optimizer = torch.optim.Adam(self.env_model.parameters(), lr=1e-3)
    
  
    
     self.pol_freq_update = pol_freq_update
    
     self.policy_noise = policy_noise
    
     self.noise_clip = noise_clip
    
     self.act_noise = act_noise
    
     self.gamma = gamma
    
     self.tau = tau
    
     self.env_steps = env_steps
    
     self.env_w = env_w
    
     self.lambda_a = lambda_a
    
     self.lambda_bc = lambda_bc
    
     self.lambda_q = lambda_q
    
  
    
     self.policy_clip_min = torch.tensor([-0.75, -1.0]).to(device)
    
     self.policy_clip_max = torch.tensor([0.75, 1.0]).to(device)
    
  
    
     self.policy_clip_max_np = self.policy_clip_max.cpu().numpy()
    
     self.policy_clip_min_np = self.policy_clip_min.cpu().numpy()
    
  
    
     self.expert_buffer = ReplayBuffer(exp_buff_size, device)
    
     self.actor_buffer = ReplayBuffer(actor_buffer_size, device)
    
     self.batch_size = batch_size
    
     self.exp_batch_size = int(exp_prop*batch_size)
    
     self.actual_batch_size = batch_size - self.exp_batch_size
    
  
    
     self.device = device
    
  
    
     self.ou_noise = OUNoise(2, sigma=act_noise)
    
  
    
     #Training variables
    
     self.pre_tr_step = 0
    
     self.change_lr = True
    
     self.tr_step = 0
    
     self.tr_steps_vec = []
    
     self.avg_reward_vec = []
    
     self.std_reward_vec = []
    
     self.success_rate_vec = []
    
     self.episode_nb = 0
    
  
    
     def select_action(self, obs, prev_action, eval=False):
    
     emb, command = obs
    
     emb = torch.FloatTensor(emb.reshape(1, -1)).to(self.device)
    
     command = torch.tensor(command).reshape(1, 1).to(self.device)
    
     prev_action = torch.FloatTensor(prev_action).reshape(1, -1).to(self.device)
    
  
    
     with torch.no_grad():
    
         action = self.actor(emb, command, prev_action).cpu().numpy().flatten()
    
     if not eval:
    
         #noise = np.random.normal(0, self.act_noise, size=action.shape)
    
         noise = self.ou_noise.sample()
    
         action = (action + noise).clip(self.policy_clip_min_np, self.policy_clip_max_np)
    
     return action.tolist()
    
     
    
     def reset_noise(self):
    
     self.ou_noise.reset()
    
     
    
     def store_transition(self, p_act, obs, act, rew, next_obs, done):
    
     self.actor_buffer.store_transition((p_act, obs, act, rew, next_obs, done))
    
  
    
     def update_step(self, it, is_pretraining):
    
     self.actor_optimizer.zero_grad()
    
     self.critic_optimizer.zero_grad()
    
  
    
     if is_pretraining:
    
         p_act_exp, obs_exp, act_exp, rew_exp, next_obs_exp, done_exp = self.expert_buffer.sample(self.batch_size)
    
         obs = obs_exp
    
         next_obs = next_obs_exp
    
         act = act_exp
    
         rew = rew_exp
    
  
    
         critic_loss = self._compute_critic_loss(obs_exp, act_exp, rew_exp, next_obs_exp, done_exp)
    
     else:
    
         p_act_exp, obs_exp, act_exp, rew_exp, next_obs_exp, done_exp = self.expert_buffer.sample(self.exp_batch_size)
    
         p_act_act, obs_act, act_act, rew_act, next_obs_act, done_act = self.actor_buffer.sample(self.actual_batch_size)
    
  
    
         emb_exp, command_exp = obs_exp
    
         emb_act, command_act = obs_act
    
         obs = (torch.cat((emb_exp, emb_act), dim=0), torch.cat((command_exp, command_act), dim=0))
    
         p_act = torch.cat((p_act_exp, p_act_act), dim=0)
    
         act = torch.cat((act_exp, act_act), dim=0)
    
         rew = torch.cat((rew_exp, rew_act), dim=0)
    
         next_emb_exp, next_command_exp = next_obs_exp
    
         next_emb_act, next_command_act = next_obs_act
    
         next_obs = (torch.cat((next_emb_exp, next_emb_act), dim=0), torch.cat((next_command_exp, next_command_act), dim=0))
    
         done = torch.cat((done_exp, done_act), dim=0)
    
  
    
         critic_loss = self.lambda_q*self._compute_critic_loss(obs, act, rew, next_obs, done)
    
  
    
     critic_loss.backward()
    
     torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 2.0)
    
     self.critic_optimizer.step()
    
     self.critic_scheduler.step()
    
     
    
     if it%self.pol_freq_update==0:
    
         if is_pretraining:
    
             actor_loss = self._compute_bc_loss(obs_exp, act_exp, p_act_exp)
    
         else:
    
             actor_loss = self.lambda_bc*self._compute_bc_loss(obs_exp, act_exp, p_act_exp)
    
             actor_loss += self.lambda_a*self._compute_actor_loss(obs, p_act)
    
             actor_loss += self.env_w*self._compute_env_loss(obs, p_act)
    
  
    
         actor_loss.backward()
    
         torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 2.0)
    
         self.actor_optimizer.step()
    
         # Update the frozen target models
    
         for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
    
             target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
    
  
    
         for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
    
             target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
    
         
    
         self.actor_scheduler.step()
    
  
    
     self._update_env_model(obs, act, rew, next_obs)
    
  
    
     
    
     def _compute_bc_loss(self, obs, act, p_act):
    
     emb, command = obs
    
     pi_s = self.actor(emb, command, p_act)
    
     return torch.mean((pi_s-act)**2)
    
     
    
     def _compute_critic_loss(self, obs, act, rew, next_obs, done):
    
     emb, command = obs
    
     emb_, command_ = next_obs
    
     noise = torch.randn_like(act)*self.policy_noise
    
     noise = noise.clamp(-self.noise_clip, self.noise_clip).to(self.device)
    
     next_act = (self.actor_target(emb_, command_, act)+noise).clamp(self.policy_clip_min, self.policy_clip_max)
    
  
    
     q1, q2 = self.critic_target(emb_, command_, next_act)
    
     q = torch.min(q1, q2)
    
     q_target = rew + (self.gamma*(1-done)*q).detach()
    
  
    
     current_q1, current_q2 = self.critic(emb, command, act)
    
  
    
     critic_loss = F.mse_loss(current_q1, q_target) + F.mse_loss(current_q2, q_target)
    
  
    
     return critic_loss
    
     
    
     def _compute_actor_loss(self, obs, p_act):
    
     emb, command = obs
    
     loss = -self.critic.critic1(emb, command, self.actor(emb, command, p_act)).mean()
    
     return loss
    
     
    
     def _compute_env_loss(self, obs, p_act):
    
     emb, command = obs
    
     action = self.actor(emb, command, p_act)
    
     loss = 0
    
     for i in range(self.env_steps):
    
         emb, rew_pred = self.env_model(emb, action)
    
         loss += self.gamma**i*rew_pred
    
         action = self.actor(emb, command, action)
    
     return -loss.mean()
    
     
    
     def _update_env_model(self, obs, act, rew, next_obs):
    
     emb, _ = obs
    
     next_emb, _ = next_obs
    
  
    
     self.env_model_optimizer.zero_grad()
    
     transition = self.env_model.transition_model(emb, act)
    
     reward = self.env_model.reward_model(emb, act, next_emb)
    
     t_loss = F.mse_loss(transition, next_emb)
    
     r_loss = F.mse_loss(reward, rew)
    
     loss = t_loss + r_loss
    
     loss.backward()
    
     torch.nn.utils.clip_grad_norm_(self.env_model.parameters(), 2.0)
    
     self.env_model_optimizer.step()
    
  
    
     def change_opt_lr(self, actor_lr, critic_lr):
    
     for param_group in self.actor_optimizer.param_groups:
    
         param_group['lr'] = actor_lr
    
     for param_group in self.critic_optimizer.param_groups:
    
         param_group['lr'] = critic_lr
    
  
    
     def load_exp_buffer(self, data):
    
     for d in data:
    
         self.expert_buffer.store_transition(d)
    
  
    
     def save(self, save_path):
    
     with open(save_path, 'wb') as f:
    
         pickle.dump(self, f)

在上述代码中,在强化学习环境下设计的智能体TD3ColDeductiveAgent被定义为一个改进版算法框架,在模仿Twin-Delayed Deep Deterministic Policy Gradient算法的基础上进行了优化以提升稳定性与效率特性

  1. 初始化网络与优化器:
    初始化时,在网络构建阶段创建了两个神经元网络(actor与critic),以及它们各自对应的目标网络(actor_target与critic_target)。此外还初始化了用于训练这些网络的优化器体系(包括actor_optimizer、critic_optimizer以及env_model_optimizer)。
  2. 行动选择:
    通过select_action方法,在当前状态下结合上一动作来决定下一步行动。在训练期间还可以引入噪声以促进探索行为。
  3. 经验存储:
    代理主要利用expert_buffer和actor_buffer存储经验元组。其中expert_buffer主要用于存储由专家策略产生的经验样本;而actor_buffer则用于存储由代理策略产生的经验数据。
  4. 训练过程:
    采用update_step方法完成训练步骤,在预训练模式下执行特定损失计算及梯度更新;而在正式训练模式下则采用另一种损失函数组合进行参数优化。
  5. 损失函数构建:
    包括actor损失、critic损失以及env_model损失等三类关键损失函数,在不同阶段可依据预设权重进行动态组合计算。
  6. 环境模型更新:
    代理基于当前状态、上一动作、奖励值以及下一状态信息对环境模型进行参数更新。
  7. 学习率管理:
    通过调用change_opt_lr方法可对演员网络与评论家网络的学习率进行统一或分步调节。
  8. 经验缓冲加载:
    支持load_exp_buffer方法从指定路径中导入专家策略的经验数据集。
  9. 模型保存:
    可以通过save方法一次性保存整个代理架构及其相关参数状态信息。

总体而言,在深度强化学习领域中存在一个名为TD3ColDeductiveAgent 的通用代理类体系。该系统作为专为深度强化学习设计的一体化框架,在多个应用场景中展现出卓越的表现能力。通过整合演员–评论家学习机制、经验回放技术以及噪声抑制等在内的多项核心功能模块,在各种复杂环境中都能够高效地执行相应的任务。

未完待续

全部评论 (0)

还没有任何评论哟~