ModelFree Reinforcement Learning for Continuous Control
作者:禅与计算机程序设计艺术
1.简介
Reinforcement learning (RL) is a type of machine learning that is capable of enabling an agent to learn how to interact with an environment through trial and error actions, with the goal of maximizing reward over time. Among the most popular families of RL algorithms are model-free reinforcement learning (MFRL), which do not require knowledge or modeling of the transition dynamics between states or the underlying system being controlled. These algorithms rely solely on experience, which is collected through interaction with the environment. The most promising approach for addressing continuous control problems is deep deterministic policy gradient (DDPG).
In this article, we aim to offer a comprehensive overview of fundamental concepts and terminology associated with Model-Free Reinforcement Learning (MFRL) in the context of continuous control tasks. A detailed explanation of the DDPG algorithm and its implementation will be provided, followed by an examination of its various applications. Additionally, we will highlight some emerging research challenges in this domain. Our objective is to assist both newcomers seeking to gain a foundational understanding of the field as well as researchers intent on advancing their expertise. With this, we are now prepared to delve into the subject.
2.基本概念术语
2.1 Markov Decision Process(MDP)
The sequential decision-making process under uncertainty is mathematically outlined by the MDP framework. It outlines the set of possible states, the available actions at each state, the immediate rewards obtained from taking each action, and the probability distribution of next states given current state and action.
A MDP is composed of four fundamental components: the state space S, action set A, probability distribution over state transitions P, and expected reward signals R. Generally, the process of solving an MDP begins with an initial state s_0. The agent then selects an action a_t, receives a reward r_t, and transitions to the next state s_{t+1} based on the transition probabilities P(s_{t+1}|s_t,a_t). This sequential process continues indefinitely until the agent reaches a terminal state s_T. Discount factors are employed to weigh the relative significance of future rewards compared to current rewards.
2.2 Policy
该政策是从状态空间到动作空间的映射。它指定了在每个状态下应该采取的行动。例如,若目标是移动一个车杆,一个合理的政策可能是尽可能保持其平衡,同时向轨道中心移动。另一个策略可能是以小概率在每个状态下选择随机动作,以确保探索。
2.3 Value Function
The value function V(s) represents the expected long-term return originating from state s, i.e., the accumulated expected returns over multiple steps. Mathematically, it is defined as the maximum expected total reward originating from state s when acting in accordance with the current policy π. The Bellman equation provides a mathematical framework for estimating the value function by considering the optimal actions and their corresponding rewards.
The value function V(s) is defined as the expected total reward starting from state s, calculated as V(s) = E[R + gamma * max_{a'}(Q(s',a')) | s], where Q(s',a') represents the predicted total reward obtained after taking action a' from state s'. Specifically, gamma is the discount factor, which influences the significance of future rewards compared to the current reward. Higher values of gamma result in greater weight being assigned to later rewards, whereas lower values result in greater focus being placed on immediate rewards.
2.4 Deep Deterministic Policy Gradient(DDPG) Algorithm
DDPG是一种基于深度神经网络的无模型、非策略型动作-策略方法。它融合了DQN(深度Q网络)和政策梯度的概念。DDPG采用两个独立的神经网络,分别称为Actor网络和Critic网络,以估计最优策略和价值函数。每个网络预测在特定状态下采取的最佳行动。
The Actor network is trained to learn an optimal policy π, which aims to maximize the expected total reward starting from any given state s through stochastic sampling. At each step t, the Actor selects an action a_t from π(a|s) using the most recent version of the weights θ^π. It then executes this action in the environment to acquire the next state s_{t+1}, the reward r_t, and whether the episode has concluded. The Actor subsequently adjusts its parameters Θ^π via backpropagation based on the sampled action.
The critic network estimates the value function V(s) by calculating the temporal difference (TD) error. For each timestep t, the critic assesses the quality of the action executed by the actor in that timestep. If the action proved successful, the critic receives a positive TD error; conversely, if the action was subpar, the critic receives a negative TD error. These errors are then utilized to update the critic's parameter vector Θ^V.
The two networks collaborate effectively to enhance their individual performance capabilities by employing Experience Replay techniques, managing online learning scenarios as data is incrementally collected.
3.DDPG Implementation Details
Let’s now implement DDPG algorithm using PyTorch library.
Firstly, let us define the Environment class, which will model the physical system under control. Here, we will establish a straightforward inverted pendulum environment incorporating a massless rod connected to a point mass with frictional damping. The governing equations of motion are derived from those of the inverted pendulum, with some simplifications. Our objective is to train the Agent to swing the pendulum upward from a stationary position while maintaining zero angular velocity. The input to the environment includes the angle from the vertical and the angular velocity about the pivot axis, with the output being the torque applied to the cart.
import torch
from gym import Env
class SwingUpEnv(Env):
def __init__(self):
super().__init__()
# Define system parameters
self.masscart = 1.0 # mass of the cart (kg)
self.masspole = 0.1 # mass of the pole (kg)
self.total_mass = (self.masspole + self.masscart) # total mass (kg)
self.length = 0.5 # length of the pole (meter)
self.polemass_length = (self.masspole * self.length) # half mass times pole length (kg m)
# Define hyperparameters
self.force_mag = 10.0 # magnitude of gravity (N)
self.tau = 0.02 # seconds between state updates
def _step(self, action):
"""Update the state of the pendulum."""
th, thdot = self.state # th := theta (angle from vertical)
# thdot := angular velocity about the pivot axis
g = np.array([0.0, -self.force_mag])
c, s = np.cos(th), np.sin(th)
temp = (g * self.masspole * c) / (self.total_mass * self.length**2) + \
(-self.polemass_length * s * thdot**2 * c -
0.5 * self.length * self.masspole * (s**2 * c**2 + 2*self.masscart)) / (self.total_mass * self.length)
tau = ((action + self.polemass_length * s * c * thdot ** 2 +
0.5 * self.length * self.masspole * (s**2 * c**2 - 1)))/self.total_mass
tau += temp
# Update the state variables using Euler's Method
self.last_u = u
newthdot = thdot + (-3*g/(2*self.length) + 3./(self.masspole*self.length**2)*temp - 1./(self.total_mass * self.length))*self.tau
newth = th + newthdot*self.tau
self.state = np.array([newth, newthdot])
observation = np.array([np.cos(self.state[0]), np.sin(self.state[0]), self.state[1]])
reward = np.cos(self.state[0])
done = False
info = {}
return observation, reward, done, info
def reset(self):
"""Reset the simulation to its initial conditions."""
self.state = np.array([np.pi, 0])
self.last_u = None
def render(self):
pass
代码解读
随后,我们需要定义Actor和Critic网络。我们计划采用三个全连接层,并使用ReLU激活函数。
class ActorNet(nn.Module):
def __init__(self, num_inputs, num_outputs):
super(ActorNet, self).__init__()
self.net = nn.Sequential(
nn.Linear(num_inputs, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, num_outputs),
nn.Tanh()
)
def forward(self, x):
return self.net(x)
class CriticNet(nn.Module):
def __init__(self, num_inputs, num_actions):
super(CriticNet, self).__init__()
self.net = nn.Sequential(
nn.Linear(num_inputs+num_actions, 400),
nn.ReLU(),
nn.Linear(400, 300),
nn.ReLU(),
nn.Linear(300, 1)
)
def forward(self, xs, a):
cat_input = torch.cat((xs, a), dim=1)
qvalue = self.net(cat_input)
return qvalue
代码解读
We also need to establish the DDPGAgent class, which comprises methods for the training process, updating network weights, calculating target Q values, and calculating the loss function.
class DDPGAgent():
def __init__(self, env):
self.env = env
# Initialize actor and critic networks
self.actor = ActorNet(num_inputs=2, num_outputs=1)
self.critic = CriticNet(num_inputs=2, num_actions=1)
# Load saved models if they exist
try:
self.actor.load_state_dict(torch.load('model/ddpg_actor.pt'))
self.critic.load_state_dict(torch.load('model/ddpg_critic.pt'))
except FileNotFoundError:
print("Trained models not found.")
# Use CUDA if GPU available
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = torch.device(device)
self.actor.to(self.device)
self.critic.to(self.device)
# Create optimizer objects for both networks
self.actor_optimizer = optim.Adam(self.actor.parameters())
self.critic_optimizer = optim.Adam(self.critic.parameters())
# Define replay buffer object
self.buffer = Buffer(buffer_size=100000, batch_size=64, seed=0)
def compute_target_qvals(self, next_states, rewards):
"""Compute the target Q values used for training"""
# Get the Q-values predicted by the target critic for the next states
with torch.no_grad():
targets = self.target_critic(next_states, self.target_actor(next_states)).squeeze().detach()
# Compute the target Q values
targets *= GAMMA
targets += rewards
# Add noise to target Q values to encourage exploration
targets += NOISE*torch.randn(targets.shape, device=self.device)
return targets
def train_model(self, experiences):
"""Train the actor and critic networks"""
# Extract the relevant elements from the experiences tuple
obs_batch, acts_batch, rews_batch, next_obs_batch, dones_batch = experiences
# Train the critic network
pred_qs = self.critic(obs_batch, acts_batch)
targets = self.compute_target_qvals(next_obs_batch, rews_batch)
critic_loss = F.mse_loss(pred_qs, targets)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# Train the actor network
pi = self.actor(obs_batch)
qs = self.critic(obs_batch, pi)
actor_loss = -torch.mean(qs)
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# Update the target networks
soft_update(self.target_actor, self.actor, TAU)
soft_update(self.target_critic, self.critic, TAU)
def update_weights(self, experiences):
"""Add the experience to the buffer and update weights periodically"""
self.buffer.add(experiences)
if len(self.buffer) >= MINIBATCH_SIZE:
mini_batch = self.buffer.sample()
self.train_model(mini_batch)
def save_models(self):
"""Save trained models"""
torch.save(self.actor.state_dict(),'model/ddpg_actor.pt')
torch.save(self.critic.state_dict(),'model/ddpg_critic.pt')
代码解读
Now, we can assemble all components into a central function that executes the simulation and trains the agent. Once the simulation and training are complete, we then generate the results and assess the agent's performance.
if __name__ == "__main__":
MAX_EPISODES = 1000 # Maximum number of episodes to run
EPSILON = 1.0 # Initial epsilon value (chance to explore)
DECAY_RATE = 0.99 # Decay rate for epsilon (discount factor)
EPSILON_MIN = 0.01 # Minimum epsilon value
BUFFER_SIZE = int(1e6) # Size of the replay buffer
LR_ACTOR = 1e-4 # Learning rate for actor
LR_CRITIC = 1e-3 # Learning rate for critic
WEIGHT_DECAY = 0 # L2 weight decay
TAU = 0.001 # Soft target update parameter
GAMMA = 0.99 # Discount factor
NOISE = 0.1 # Exploration noise
UPDATE_FREQ = 1 # How often to update the network
MINIBATCH_SIZE = 1024 # Minibatch size for training
# Create environment and agent
env = SwingUpEnv()
agent = DDPGAgent(env)
scores = [] # List to store scores per episode
scores_window = deque(maxlen=100) # Last 100 scores
epsilons = [] # List to store epsilon values
avg_scores = [] # Average score per 100 episodes
# Main loop
for i_episode in range(MAX_EPISODES):
state = env.reset()
score = 0 # Score for this episode
while True:
# Select action depending on epsilon greedy policy
epsilon = EPSILON if i_episode > EPSILON_DELAY else EPSILON*(EPSILON_START-EPSILON_END)/(EPSILON_DELAY)*(i_episode-EPSILON_DELAY)+EPSILON_END
action = agent.act(state, epsilon)
# Take action in environment and receive new state and reward
next_state, reward, done, _ = env.step(action)
# Store experience in replay buffer
agent.replay_memory.push(state, action, reward, next_state, done)
# Update state and add score to cumulative score variable
state = next_state
score += reward
# Perform one step of the optimization (on the target network)
if i_episode % UPDATE_FREQ == 0:
agent.learn()
if done:
break
# Append scores and epsilon values to their corresponding lists
scores.append(score)
scores_window.append(score)
epsilons.append(epsilon)
average_score = np.mean(scores_window)
avg_scores.append(average_score)
# Print information every 100 episodes
if i_episode % 100 == 0:
print('\rEpisode {}\tAverage Score: {:.2f}\tepsilon: {:.2f}'.format(i_episode, average_score, epsilon))
# Save trained models after every 500 episodes
if i_episode % 500 == 0:
agent.save_models()
# Plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(len(avg_scores)), avg_scores)
plt.ylabel('Average Score')
plt.xlabel('Episode #')
plt.show()
代码解读
