Advertisement

ModelFree Reinforcement Learning for Continuous Control

阅读量:

作者:禅与计算机程序设计艺术

1.简介

Reinforcement learning (RL) is a type of machine learning that is capable of enabling an agent to learn how to interact with an environment through trial and error actions, with the goal of maximizing reward over time. Among the most popular families of RL algorithms are model-free reinforcement learning (MFRL), which do not require knowledge or modeling of the transition dynamics between states or the underlying system being controlled. These algorithms rely solely on experience, which is collected through interaction with the environment. The most promising approach for addressing continuous control problems is deep deterministic policy gradient (DDPG).

In this article, we aim to offer a comprehensive overview of fundamental concepts and terminology associated with Model-Free Reinforcement Learning (MFRL) in the context of continuous control tasks. A detailed explanation of the DDPG algorithm and its implementation will be provided, followed by an examination of its various applications. Additionally, we will highlight some emerging research challenges in this domain. Our objective is to assist both newcomers seeking to gain a foundational understanding of the field as well as researchers intent on advancing their expertise. With this, we are now prepared to delve into the subject.

2.基本概念术语

2.1 Markov Decision Process(MDP)

The sequential decision-making process under uncertainty is mathematically outlined by the MDP framework. It outlines the set of possible states, the available actions at each state, the immediate rewards obtained from taking each action, and the probability distribution of next states given current state and action.

A MDP is composed of four fundamental components: the state space S, action set A, probability distribution over state transitions P, and expected reward signals R. Generally, the process of solving an MDP begins with an initial state s_0. The agent then selects an action a_t, receives a reward r_t, and transitions to the next state s_{t+1} based on the transition probabilities P(s_{t+1}|s_t,a_t). This sequential process continues indefinitely until the agent reaches a terminal state s_T. Discount factors are employed to weigh the relative significance of future rewards compared to current rewards.

2.2 Policy

该政策是从状态空间到动作空间的映射。它指定了在每个状态下应该采取的行动。例如,若目标是移动一个车杆,一个合理的政策可能是尽可能保持其平衡,同时向轨道中心移动。另一个策略可能是以小概率在每个状态下选择随机动作,以确保探索。

2.3 Value Function

The value function V(s) represents the expected long-term return originating from state s, i.e., the accumulated expected returns over multiple steps. Mathematically, it is defined as the maximum expected total reward originating from state s when acting in accordance with the current policy π. The Bellman equation provides a mathematical framework for estimating the value function by considering the optimal actions and their corresponding rewards.

The value function V(s) is defined as the expected total reward starting from state s, calculated as V(s) = E[R + gamma * max_{a'}(Q(s',a')) | s], where Q(s',a') represents the predicted total reward obtained after taking action a' from state s'. Specifically, gamma is the discount factor, which influences the significance of future rewards compared to the current reward. Higher values of gamma result in greater weight being assigned to later rewards, whereas lower values result in greater focus being placed on immediate rewards.

2.4 Deep Deterministic Policy Gradient(DDPG) Algorithm

DDPG是一种基于深度神经网络的无模型、非策略型动作-策略方法。它融合了DQN(深度Q网络)和政策梯度的概念。DDPG采用两个独立的神经网络,分别称为Actor网络和Critic网络,以估计最优策略和价值函数。每个网络预测在特定状态下采取的最佳行动。

The Actor network is trained to learn an optimal policy π, which aims to maximize the expected total reward starting from any given state s through stochastic sampling. At each step t, the Actor selects an action a_t from π(a|s) using the most recent version of the weights θ^π. It then executes this action in the environment to acquire the next state s_{t+1}, the reward r_t, and whether the episode has concluded. The Actor subsequently adjusts its parameters Θ^π via backpropagation based on the sampled action.

The critic network estimates the value function V(s) by calculating the temporal difference (TD) error. For each timestep t, the critic assesses the quality of the action executed by the actor in that timestep. If the action proved successful, the critic receives a positive TD error; conversely, if the action was subpar, the critic receives a negative TD error. These errors are then utilized to update the critic's parameter vector Θ^V.

The two networks collaborate effectively to enhance their individual performance capabilities by employing Experience Replay techniques, managing online learning scenarios as data is incrementally collected.

3.DDPG Implementation Details

Let’s now implement DDPG algorithm using PyTorch library.

Firstly, let us define the Environment class, which will model the physical system under control. Here, we will establish a straightforward inverted pendulum environment incorporating a massless rod connected to a point mass with frictional damping. The governing equations of motion are derived from those of the inverted pendulum, with some simplifications. Our objective is to train the Agent to swing the pendulum upward from a stationary position while maintaining zero angular velocity. The input to the environment includes the angle from the vertical and the angular velocity about the pivot axis, with the output being the torque applied to the cart.

复制代码
    import torch
    from gym import Env
    
    class SwingUpEnv(Env):
    def __init__(self):
        super().__init__()
    
        # Define system parameters 
        self.masscart = 1.0   # mass of the cart (kg)
        self.masspole = 0.1   # mass of the pole (kg)
        self.total_mass = (self.masspole + self.masscart)   # total mass (kg)
        self.length = 0.5     # length of the pole (meter)
        self.polemass_length = (self.masspole * self.length)    # half mass times pole length (kg m)
    
        # Define hyperparameters
        self.force_mag = 10.0       # magnitude of gravity (N)
        self.tau = 0.02              # seconds between state updates
    
    def _step(self, action):
        """Update the state of the pendulum."""
        th, thdot = self.state   # th := theta (angle from vertical)
                                # thdot := angular velocity about the pivot axis
    
        g = np.array([0.0, -self.force_mag])
        c, s = np.cos(th), np.sin(th)
        temp = (g * self.masspole * c) / (self.total_mass * self.length**2) + \
               (-self.polemass_length * s * thdot**2 * c -
                0.5 * self.length * self.masspole * (s**2 * c**2 + 2*self.masscart)) / (self.total_mass * self.length)
        tau = ((action + self.polemass_length * s * c * thdot ** 2 + 
                0.5 * self.length * self.masspole * (s**2 * c**2 - 1)))/self.total_mass
        tau += temp
    
        # Update the state variables using Euler's Method
        self.last_u = u
        newthdot = thdot + (-3*g/(2*self.length) + 3./(self.masspole*self.length**2)*temp - 1./(self.total_mass * self.length))*self.tau
        newth = th + newthdot*self.tau
        self.state = np.array([newth, newthdot])
    
        observation = np.array([np.cos(self.state[0]), np.sin(self.state[0]), self.state[1]])
        reward = np.cos(self.state[0])
        done = False
    
        info = {}
    
        return observation, reward, done, info
    
    def reset(self):
        """Reset the simulation to its initial conditions."""
        self.state = np.array([np.pi, 0])
        self.last_u = None
    
    def render(self):
        pass
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

随后,我们需要定义Actor和Critic网络。我们计划采用三个全连接层,并使用ReLU激活函数。

复制代码
    class ActorNet(nn.Module):
    def __init__(self, num_inputs, num_outputs):
        super(ActorNet, self).__init__()
    
        self.net = nn.Sequential(
            nn.Linear(num_inputs, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, num_outputs),
            nn.Tanh()
        )
    
    def forward(self, x):
        return self.net(x)
    
    class CriticNet(nn.Module):
    def __init__(self, num_inputs, num_actions):
        super(CriticNet, self).__init__()
    
        self.net = nn.Sequential(
            nn.Linear(num_inputs+num_actions, 400),
            nn.ReLU(),
            nn.Linear(400, 300),
            nn.ReLU(),
            nn.Linear(300, 1)
        )
    
    def forward(self, xs, a):
        cat_input = torch.cat((xs, a), dim=1)
        qvalue = self.net(cat_input)
        return qvalue
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

We also need to establish the DDPGAgent class, which comprises methods for the training process, updating network weights, calculating target Q values, and calculating the loss function.

复制代码
    class DDPGAgent():
    def __init__(self, env):
        self.env = env
    
        # Initialize actor and critic networks
        self.actor = ActorNet(num_inputs=2, num_outputs=1)
        self.critic = CriticNet(num_inputs=2, num_actions=1)
    
        # Load saved models if they exist
        try:
            self.actor.load_state_dict(torch.load('model/ddpg_actor.pt'))
            self.critic.load_state_dict(torch.load('model/ddpg_critic.pt'))
        except FileNotFoundError:
            print("Trained models not found.")
    
        # Use CUDA if GPU available
        device = "cuda" if torch.cuda.is_available() else "cpu"
        self.device = torch.device(device)
        self.actor.to(self.device)
        self.critic.to(self.device)
    
        # Create optimizer objects for both networks
        self.actor_optimizer = optim.Adam(self.actor.parameters())
        self.critic_optimizer = optim.Adam(self.critic.parameters())
    
        # Define replay buffer object
        self.buffer = Buffer(buffer_size=100000, batch_size=64, seed=0)
    
    def compute_target_qvals(self, next_states, rewards):
        """Compute the target Q values used for training"""
        # Get the Q-values predicted by the target critic for the next states
        with torch.no_grad():
            targets = self.target_critic(next_states, self.target_actor(next_states)).squeeze().detach()
    
            # Compute the target Q values
            targets *= GAMMA
            targets += rewards
    
            # Add noise to target Q values to encourage exploration
            targets += NOISE*torch.randn(targets.shape, device=self.device)
    
        return targets
    
    def train_model(self, experiences):
        """Train the actor and critic networks"""
        # Extract the relevant elements from the experiences tuple
        obs_batch, acts_batch, rews_batch, next_obs_batch, dones_batch = experiences
    
        # Train the critic network
        pred_qs = self.critic(obs_batch, acts_batch)
        targets = self.compute_target_qvals(next_obs_batch, rews_batch)
        critic_loss = F.mse_loss(pred_qs, targets)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
    
        # Train the actor network
        pi = self.actor(obs_batch)
        qs = self.critic(obs_batch, pi)
        actor_loss = -torch.mean(qs)
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
    
        # Update the target networks
        soft_update(self.target_actor, self.actor, TAU)
        soft_update(self.target_critic, self.critic, TAU)
    
    def update_weights(self, experiences):
        """Add the experience to the buffer and update weights periodically"""
        self.buffer.add(experiences)
    
        if len(self.buffer) >= MINIBATCH_SIZE:
            mini_batch = self.buffer.sample()
            self.train_model(mini_batch)
    
    def save_models(self):
        """Save trained models"""
        torch.save(self.actor.state_dict(),'model/ddpg_actor.pt')
        torch.save(self.critic.state_dict(),'model/ddpg_critic.pt')
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

Now, we can assemble all components into a central function that executes the simulation and trains the agent. Once the simulation and training are complete, we then generate the results and assess the agent's performance.

复制代码
    if __name__ == "__main__":
    MAX_EPISODES = 1000         # Maximum number of episodes to run
    EPSILON = 1.0               # Initial epsilon value (chance to explore)
    DECAY_RATE = 0.99           # Decay rate for epsilon (discount factor)
    EPSILON_MIN = 0.01          # Minimum epsilon value
    BUFFER_SIZE = int(1e6)      # Size of the replay buffer
    LR_ACTOR = 1e-4             # Learning rate for actor
    LR_CRITIC = 1e-3            # Learning rate for critic
    WEIGHT_DECAY = 0            # L2 weight decay
    TAU = 0.001                 # Soft target update parameter
    GAMMA = 0.99                # Discount factor
    NOISE = 0.1                 # Exploration noise
    UPDATE_FREQ = 1             # How often to update the network
    MINIBATCH_SIZE = 1024       # Minibatch size for training
    
    # Create environment and agent
    env = SwingUpEnv()
    agent = DDPGAgent(env)
    
    scores = []                    # List to store scores per episode
    scores_window = deque(maxlen=100)        # Last 100 scores
    epsilons = []                   # List to store epsilon values
    avg_scores = []                 # Average score per 100 episodes
    
    # Main loop
    for i_episode in range(MAX_EPISODES):
        state = env.reset()
        score = 0                            # Score for this episode
    
        while True:
            # Select action depending on epsilon greedy policy
            epsilon = EPSILON if i_episode > EPSILON_DELAY else EPSILON*(EPSILON_START-EPSILON_END)/(EPSILON_DELAY)*(i_episode-EPSILON_DELAY)+EPSILON_END
            action = agent.act(state, epsilon)
    
            # Take action in environment and receive new state and reward
            next_state, reward, done, _ = env.step(action)
    
            # Store experience in replay buffer
            agent.replay_memory.push(state, action, reward, next_state, done)
    
            # Update state and add score to cumulative score variable
            state = next_state
            score += reward
    
            # Perform one step of the optimization (on the target network)
            if i_episode % UPDATE_FREQ == 0:
                agent.learn()
    
            if done:
                break
    
        # Append scores and epsilon values to their corresponding lists
        scores.append(score)
        scores_window.append(score)
        epsilons.append(epsilon)
    
        average_score = np.mean(scores_window)
        avg_scores.append(average_score)
    
        # Print information every 100 episodes
        if i_episode % 100 == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}\tepsilon: {:.2f}'.format(i_episode, average_score, epsilon))
    
        # Save trained models after every 500 episodes
        if i_episode % 500 == 0:
            agent.save_models()
    
    # Plot the scores
    fig = plt.figure()
    ax = fig.add_subplot(111)
    plt.plot(np.arange(len(avg_scores)), avg_scores)
    plt.ylabel('Average Score')
    plt.xlabel('Episode #')
    plt.show()
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

全部评论 (0)

还没有任何评论哟~