Advertisement

(17-6-01)基于强化学习的自动驾驶系统:强化学习工具类+经验回放存储

阅读量:

在'reinforcement'目录下开发了多个程序文件,在其核心内容中包括一系列强化学习算法的具体实现方案。这些方案涵盖了一系列动作空间类型(包括连续型与离散型)及其对应的学习算法(如Deep Deterministic Policy Gradient, Soft Actor-Critic, Proximal Policy Optimization, 和Deep Q-Network等)。这些方案可用于多种强化学习场景,并支持研究人员与开发者在不同场景下进行强化学习的应用研究与实践。

17.7.1 强化学习工具类

创建文件reinforcement/utils.py。
该文件的功能包括提供一些广泛应用于强化学习中的工具类和函数。
这些工具类和函数主要用于强化学习算法中对智能体行为的控制以及训练过程的支持。
具体的代码实现见下文。

复制代码
 import numpy as np

    
 import torch
    
  
    
 class OUNoise:
    
     def __init__(self, mu, sigma=0.4, theta=.6, dt=0.05, x0=None, noise_decay=0.0):
    
     self.theta = theta
    
     self.mu = mu
    
     self.sigma = sigma
    
     self.dt = dt
    
     self.x0 = x0
    
     self.noise_decay = noise_decay*sigma
    
     self.reset()
    
  
    
     def __call__(self):
    
     x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
    
     self.x_prev = x
    
     return x
    
  
    
     def reset(self):
    
     self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
    
     self.sigma = max(self.sigma-self.noise_decay, 0.01)
    
  
    
     def __repr__(self):
    
     return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})'.format(self.mu, self.sigma)
    
     
    
 class StepLR(torch.optim.lr_scheduler._LRScheduler):
    
     def __init__(self, optimizer, step_size, gamma=0.9, last_epoch=-1, min_lr=1e-6, verbose=False):
    
     self.step_size = step_size
    
     self.gamma = gamma
    
     self.min_lr = min_lr
    
     super().__init__(optimizer, last_epoch, verbose)
    
  
    
     def get_lr(self):
    
     if (self.last_epoch == 0) or (self.last_epoch % self.step_size != 0):
    
         return [group['lr'] for group in self.optimizer.param_groups]
    
     return [max(self.min_lr, group['lr'] * self.gamma)
    
             for group in self.optimizer.param_groups]

对上述代码的具体说明如下:

  1. 类OUNoise遵循Ornstein-Uhlenbeck过程,并被设计为给智能体的动作注入随机噪声。这种机制模拟了控制环境中环境噪声的影响,并有助于维持智能体动作探索环境时的一定不确定性,从而提高其策略的学习效果。
  2. 类StepLR是一种自定义的学习率调整机制,在训练期间通过逐步减少学习速率来优化模型参数。该算法基于指定的间隔长度(step_size)和衰减系数(gamma)设计,在每个间隔周期后将当前的学习速率乘以衰减系数进行更新操作,并提供了一个可选的最低保障水平的学习速率参数(min_lr),以防过低 Learning Rate values fall below this threshold.

17.7.2 经验回放存储

创建文件reinforcement/buffer.py,并实现名为ReplayBuffer的经验回放缓冲区类。该缓冲区主要用于存储强化学习中的经验回放元组,并通过这一机制提高智能体的学习效率和训练稳定性。作为核心组件之一,在该框架中采用经验回放技术记录智能体与环境之间的互动经验,并以提高训练效率和稳定性的方式进行后续的数据处理。

复制代码
 import numpy as np

    
  
    
 class ReplayBuffer(object):
    
     """缓冲区用于存储经验回放的元组"""
    
  
    
     def __init__(self, max_size=20000):
    
     """
    
     Args:
    
         max_size (int): 存储元组的最大数量
    
     """
    
     
    
     self.storage = []  # 存储经验元组的列表
    
     self.max_size = max_size  # 缓冲区的最大容量
    
     self.ptr = 0  # 指针,用于追踪最新的元组位置
    
  
    
     def add(self, data):
    
     """添加经验元组到缓冲区
    
     (状态, 动作, 奖励, 下一个状态, 完成标志)
    
     
    
     Args:
    
         data (tuple): 经验回放元组
    
     """
    
     
    
     if len(self.storage) == self.max_size:
    
         self.storage[int(self.ptr)] = data  # 如果缓冲区已满,覆盖旧的元组数据
    
         self.ptr = (self.ptr + 1) % self.max_size
    
     else:
    
         self.storage.append(data)  # 否则,直接添加新的元组数据
    
  
    
     def __len__(self):
    
     """返回当前缓冲区的大小
    
     
    
     Returns:
    
         int: 当前缓冲区的大小
    
     """
    
     
    
     return len(self.storage)
    
  
    
     def sample(self, batch_size):
    
     """从缓冲区中随机抽样一批指定大小的经验元组
    
     
    
     Args:
    
         batch_size (int): 抽样的批量大小
    
     Returns:
    
         tuple: 状态、动作、奖励、下一个状态、完成标志
    
     """
    
     
    
     ind = np.random.choice(len(self.storage), size=batch_size, replace=False)  # 随机抽样索引
    
     states, actions, next_states, rewards, dones = [], [], [], [], []
    
  
    
     for i in ind: 
    
         s, a, r, s_, d = self.storage[i]  # 获取抽样的经验元组
    
         states.append(np.array(s, copy=False, dtype=np.float32))
    
         actions.append(np.array(a, copy=False, dtype=np.float32))
    
         next_states.append(np.array(s_, copy=False, dtype=np.float32))
    
         rewards.append(np.array(r, copy=False, dtype=np.float32))
    
         dones.append(np.array(d, copy=False, dtype=np.float32))
    
  
    
     return np.array(states), np.array(actions), np.array(rewards).reshape(-1, 1), np.array(next_states), np.array(dones).reshape(-1, 1)

对上述代码的具体说明如下:

  1. init(self, max_size=20000):初始化过程用于建立一个容量有限的经验回放缓冲区,并允许用户指定最大存储容量max_size,默认值为20 万条经验。
  2. add(self, data):该函数负责将一组包含状态(state)、动作(action)、奖励(reward)、下一个状态(next_state)以及完成标志(done)等信息的经验元组存入缓冲区。当缓存池已满时会将新数据替换掉旧数据以实现循环利用特性。
  3. len(self):此方法返回当前缓存池中积累的经验元组总数值,并即总共有多少个经验元组被存储起来了。
  4. sample(self, batch_size):此方法从当前缓存池中随机抽取一批样本供后续训练使用,并且这些样本会被系统地分解为states、actions、rewards等五个独立组件以便于模型的学习与优化工作。

未完待续

全部评论 (0)

还没有任何评论哟~