A Beginner’s Guide to Understanding DQN Algorithm
作者:禅与计算机程序设计艺术
1.简介
Deep Q-Networks(DQN)属于强化学习领域的一种方法,在Q-learning算法的基础上进行了优化改进。该算法在动态变化的环境中仍能高效收敛,并基于记忆机制不断更新其策略选择模型。可被视为一个具有自我记忆能力的学习体,在与环境交互的过程中持续优化其决策能力。
1.1 引言
本文旨在从基础出发,向读者阐述DQN算法涉及的概念与核心知识,并着重解析其运行原理、具体实现细节以及实际应用场景。该文章具有较强的学术性质,在力图帮助广大初步接触DQN算法的读者入门的同时,也热切期盼各位专家与同仁共同助力文章完善工作!
2.背景介绍
让我们回顾一下强化学习的基本概念是什么?它是一个研究如何基于奖励/惩罚信号来促使行为产生长期累积效益的领域,在其中智能体(Agent)的目标是形成能够实现长期效益的行为模式。其主要目标是帮助智能体(Agent)在复杂环境中做出决策以实现长期利益的最大化。简单来说,在强化学习中我们关注的是机器或人类如何从环境互动中获取知识并进行自我优化最终实现最大的回报收益。在这一领域中 在与环境不断交互的过程中 智能体(Agent)能够逐步掌握最优的行为策略和决策模式
为了更加准确地描述RL问题,我们首先需要定义一些基本术语:
智能体:在RL中扮演主体的角色;该角色通过特定算法实现目标;其主要功能是接收环境信息并执行相应操作;能够根据反馈不断优化自身行为策略;最终目标是最大化累计奖励值;其状态通常由环境提供的关键特征所定义;通过预设策略或动态决策机制实现任务目标;其行为受到当前状态和奖励信号的指导;其性能评估基于长期累积奖励指标
环境:RL任务中所处的外部系统或空间;它为智能体提供输入数据并生成反馈信息;环境的状态通常由观测结果反映出来;其复杂性决定了任务难度;不同环境对应的任务各异;例如物理世界中的机器人导航系统与虚拟游戏中的角色移动系统
行动:智能体根据环境中接收到的信息所作出的具体操作指令;这些指令可能是物理动作如移动、旋转等;也可能是认知行为如思考、决策等;行动通常是有限且可选的集合成员之一
观察结果:智能体从环境中感知到的数据集合;这些数据可能包括图像信息、声音信号以及触觉反馈等多维度数据来源的综合结果;观察结果反映了当前状态的重要特征
奖励值:当智能体执行某一特定行动后所获得的即时反馈信号数值指标;该数值用于衡量行动的效果优劣程度
状态:智能体当前所处的具体情境描述集合体及其相关属性总和
策略:在给定状态下选择各种可能行动的概率分布模型
价值函数:V(s)表示在状态s下获得期望累计回报值的一种评估标准
模型:描述了智能体执行某一具体行动后引发的状态转移机制
Deep Q-Network(DQN)是一种依赖于人工神经网络的智能体,在动态环境中通过试错法逐步优化自身的决策机制以实现最优行为目标。该方法的关键优势在于能够自主学习并适应环境的变化,在有限的训练数据下也能取得良好的性能表现。从系统层面来看,在相同的计算资源投入下相比传统控制理论具有显著的竞争优势。
3.核心算法原理和具体操作步骤以及数学公式讲解
3.1 Q-Learning
3.2 Deep Q-Network (DQN)
DQN是利用深度神经网络模型实现强化学习技术的一种算法。其核心特征是通过深度神经网络模型来推导状态转移关系。相较于有标签训练模式的传统监督学习方法,DQN的优势在于能够有效降低对高质量标注数据的需求,而是依赖于通过环境交互生成的人工样本进行训练。其基本结构如下图所示:
在DQN架构中,核心组成部分包括神经网络与目标网络。其中,神经网络的功能在于接受当前状态s_t,并生成预估的动作价值q^(s_t,a)。这些预估的动作价值则用于指导策略更新过程。而目标网络被视为神经网络的备份版本,在其参数更新方面始终保持同步。在DQN中,训练目标如下:
y_{target} = r + \gamma \max _{a'} q_\theta'(s_{t+1}, a')
在其中r为立即奖励,在\gamma为折扣系数的情况下,在由目标网络预测得到的s_{t+1}状态下采取动作a'所获得的Q值。
对于DQN算法,有以下几步:
生成并存储至缓存中的训练数据集:通过当前策略生成数据后存储至缓存系统中。
执行推算过程以获得动作评估结果:从缓存中读取数据后运用神经网完成推算得出动作预估值。
基于预测动作值优化控制策略:借助真实奖励r与预期动作预估值对神经网参数展开更新优化工作。
采用目标网监督指导学习过程:每隔固定时间段对比目标网与自身参数展开微调工作以提高模型稳定性与准确性。
反复应用上述操作步骤直至设定终止条件满足。
DQN的优点:
- 无需大量数据的学习过程。
- 在连续状态空间下学习最优策略。
缺点:
- 完成对模型的训练需要投入更多的时间。
- 该方法可能会影响求解目标的精度。
4.具体代码实例和解释说明
4.1 数据集准备
训练数据集:记录(状态、动作、奖励、下一状态及终止标志)五元组集合用于训练神经网络模型;其预测的动作表现较之真实动作更为精确。
4.2 模型设计
DQN的设计理念具备较高的灵活性,在实际应用中可根据具体需求灵活地选择不同类型的架构。包括但不限于卷积神经网络(CNN)、循环神经网络(RNN)等都可作为可选方案之一。而在本研究中,则采用了最为基础和经典的全连接神经网络架构来进行建模。
import tensorflow as tf
class MyModel(tf.keras.Model):
def __init__(self):
super().__init__()
self.dense1 = tf.keras.layers.Dense(128, activation='relu')
self.dense2 = tf.keras.layers.Dense(128, activation='relu')
self.output = tf.keras.layers.Dense(env.action_space.n, activation='linear')
def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
return self.output(x)
model = MyModel()
4.3 智能体策略设计
DQN中的策略选择遵循的是ε-greedy策略这一机制。即在特定概率下采取动作是为了减少探索行为。其中参数ε决定了随机策略所占的比例。当参数ε值较小时,则会趋向于贪心行为。
def get_epsilon(current_step: int) -> float:
"""
Get ε for epsilon greedy policy.
Args:
- current_step: The number of steps taken so far in training.
Returns:
An ε value.
"""
if current_step < INITIAL_REPLAY_SIZE:
return EPSILON_MIN
elif current_step > FINAL_EXPLORATION_FRAME:
return EPSILON_MAX
else:
return max(EPSILON_MIN,
EPSILON_START - ((EPSILON_START - EPSILON_END) * (current_step - INITIAL_REPLAY_SIZE)) / (
FINAL_EXPLORATION_FRAME - INITIAL_REPLAY_SIZE))
4.4 目标网络设计
DQN开发过程中需要同时开发一个目标网络和主网络两种模型。这两种模型并存的主要目的是用作比较基准来评估彼此的表现效果。为了实现这一目标主要目的是让神经网络参数持续优化以达到最佳状态。而目标网络的具体更新规则通常是基于原网络权重基础之上加入一定量的小幅正则化项进行操作
target_network = keras.models.clone_model(model)
for layer in target_network.layers:
layer.trainable = False
target_network.compile(loss='mse', optimizer=optimizer)
@tf.function
def update_target():
weights = model.get_weights()
target_weights = target_network.get_weights()
for i in range(len(target_weights)):
target_weights[i] = weights[i] * UPDATE_TARGET_WEIGHT + target_weights[i] * (1 - UPDATE_TARGET_WEIGHT)
target_network.set_weights(target_weights)
update_target()
4.5 训练过程
训练过程分为四个阶段:
为训练模型创建初始运行环境
def train(current_step: int, env_: gym.Env):
# Initialize the environment and state variables.
done = True
total_reward = 0
observation = env_.reset()
episode_length = 0
while True:
# Get ε-greedy action from the model's policy network.
if np.random.uniform(0, 1) <= get_epsilon(current_step):
action = random.randint(0, env_.action_space.n - 1)
else:
observation = np.expand_dims(observation, axis=0).astype('float32')
action = np.argmax(model(observation)[0])
# Perform the chosen action and observe next state and reward.
new_observation, reward, done, info = env_.step(action)
total_reward += reward
# Store data in replay buffer.
transition = [observation, action, reward, new_observation, done]
memory.append(transition)
# Start training once we have enough samples in our replay buffer.
if len(memory) >= MINI_BATCH_SIZE:
# Sample a minibatch of randomly sampled transitions.
mini_batch = random.sample(memory, MINI_BATCH_SIZE)
# Unpack the states, actions, rewards, etc. from the minibatch.
obs_batch = np.array([obs for obs, _, _, _, _ in mini_batch], dtype='float32').squeeze()
act_batch = np.array([act for _, act, _, _, _ in mini_batch]).reshape((-1,))
rew_batch = np.array([rew for _, _, rew, _, _ in mini_batch]).reshape((-1,))
nxt_obs_batch = np.array([nxt_obs for _, _, _, nxt_obs, _ in mini_batch], dtype='float32').squeeze()
dones_batch = np.array([done for _, _, _, _, done in mini_batch]).reshape((-1,))
with tf.GradientTape() as tape:
# Predict the Q values corresponding to each state and action in the minibatch using main network.
pred_q_values = model(obs_batch)
# Select the predicted Q values based on the selected actions.
pred_actions = tf.reduce_sum(pred_q_values*tf.one_hot(act_batch, depth=env_.action_space.n),axis=-1)
# Calculate target Q values using the target network.
tgt_q_values = target_network(nxt_obs_batch)
max_tgt_q_values = tf.reduce_max(tgt_q_values, axis=-1)
y_batch = rew_batch + GAMMA*(1-dones_batch)*max_tgt_q_values
# Calculate loss between predicted and target Q values.
loss = tf.keras.losses.mean_squared_error(y_batch, pred_actions)
grads = tape.gradient(loss, model.trainable_variables)
opt.apply_gradients(zip(grads, model.trainable_variables))
update_target()
# Update our counters and record statistics periodically.
current_step += 1
episode_length += 1
if done or episode_length == MAX_EPISODE_LENGTH:
episode_count += 1
writer.add_scalar("Episode length", episode_length, global_step=episode_count)
writer.add_scalar("Reward per episode", total_reward, global_step=episode_count)
print(f'Ep {episode_count}: Step {current_step}, Episode Length={episode_length}, Total Reward={total_reward}')
break
observation = new_observation
4.6 测试过程
测试过程仅仅用主网络来获取动作值预测,并且不进行任何的参数更新。
def test(env_, render: bool = False):
"""Test the trained agent."""
observation = env_.reset()
total_reward = 0
episode_length = 0
while True:
if render:
env_.render()
observation = np.expand_dims(observation, axis=0).astype('float32')
action = np.argmax(model(observation)[0])
observation, reward, done, info = env_.step(action)
total_reward += reward
episode_length += 1
if done or episode_length == MAX_TESTING_EPISODE_LENGTH:
print(f'Testing Episode Length={episode_length}, Total Reward={total_reward}')
break
if not isinstance(env_.action_space, gym.spaces.Discrete):
action = list(map(lambda x: round(x, 2), action))
observation = observation.tolist()[0] if isinstance(observation, list) else observation
4.7 完整例子
完整的例子代码如下:
import gym
import numpy as np
from collections import deque
import random
from tensorflow import keras
from datetime import datetime
import tensorflow as tf
from tensorboardX import SummaryWriter
env = gym.make('CartPole-v1')
# Hyperparameters
GAMMA = 0.99
LEARNING_RATE = 0.001
UPDATE_TARGET_WEIGHT = 0.01
INITIAL_REPLAY_SIZE = 1000
FINAL_EXPLORATION_FRAME = 1000000
EPSILON_START = 1.0
EPSILON_END = 0.1
EPSILON_DECAY_STEPS = 500000
MINI_BATCH_SIZE = 32
MAX_EPISODE_LENGTH = 200
MAX_TESTING_EPISODE_LENGTH = 1000
# Other hyperparameters
MEMORY_CAPACITY = 100000
OBSERVATION_SHAPE = env.observation_space.shape
ACTION_SPACE = env.action_space.n
writer = SummaryWriter('logs/' + 'dqn_' + datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
class MyModel(tf.keras.Model):
def __init__(self):
super().__init__()
self.dense1 = tf.keras.layers.Dense(128, activation='relu')
self.dense2 = tf.keras.layers.Dense(128, activation='relu')
self.output = tf.keras.layers.Dense(ACTION_SPACE, activation='linear')
def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
return self.output(x)
model = MyModel()
opt = tf.optimizers.Adam(lr=LEARNING_RATE)
memory = deque(maxlen=MEMORY_CAPACITY)
episode_count = 0
def get_epsilon(current_step: int) -> float:
"""Get ε for epsilon greedy policy"""
if current_step < INITIAL_REPLAY_SIZE:
return EPSILON_START
elif current_step > EPSILON_DECAY_STEPS:
return EPSILON_END
else:
return max(EPSILON_START, EPSILON_END-(EPSILON_END-EPSILON_START)*(current_step-INITIAL_REPLAY_SIZE)/EPSILON_DECAY_STEPS)
target_network = keras.models.clone_model(model)
for layer in target_network.layers:
layer.trainable = False
target_network.compile(loss='mse', optimizer=opt)
def update_target():
weights = model.get_weights()
target_weights = target_network.get_weights()
for i in range(len(target_weights)):
target_weights[i] = weights[i] * UPDATE_TARGET_WEIGHT + target_weights[i] * (1 - UPDATE_TARGET_WEIGHT)
target_network.set_weights(target_weights)
def train(current_step: int, env_: gym.Env):
"""Train the agent"""
nonlocal model, target_network, opt, memory, episode_count
# Initialize the environment and state variables.
done = True
total_reward = 0
observation = env_.reset()
episode_length = 0
while True:
# Get ε-greedy action from the model's policy network.
if np.random.uniform(0, 1) <= get_epsilon(current_step):
action = random.randint(0, ACTION_SPACE - 1)
else:
observation = np.expand_dims(observation, axis=0).astype('float32')
action = np.argmax(model(observation)[0])
# Perform the chosen action and observe next state and reward.
new_observation, reward, done, info = env_.step(action)
total_reward += reward
# Store data in replay buffer.
transition = [observation, action, reward, new_observation, done]
memory.append(transition)
# Start training once we have enough samples in our replay buffer.
if len(memory) >= MINI_BATCH_SIZE:
# Sample a minibatch of randomly sampled transitions.
mini_batch = random.sample(memory, MINI_BATCH_SIZE)
# Unpack the states, actions, rewards, etc. from the minibatch.
obs_batch = np.array([obs for obs, _, _, _, _ in mini_batch], dtype='float32').squeeze()
act_batch = np.array([act for _, act, _, _, _ in mini_batch]).reshape((-1,))
rew_batch = np.array([rew for _, _, rew, _, _ in mini_batch]).reshape((-1,))
nxt_obs_batch = np.array([nxt_obs for _, _, _, nxt_obs, _ in mini_batch], dtype='float32').squeeze()
dones_batch = np.array([done for _, _, _, _, done in mini_batch]).reshape((-1,))
with tf.GradientTape() as tape:
# Predict the Q values corresponding to each state and action in the minibatch using main network.
pred_q_values = model(obs_batch)
# Select the predicted Q values based on the selected actions.
pred_actions = tf.reduce_sum(pred_q_values*tf.one_hot(act_batch, depth=ACTION_SPACE),axis=-1)
# Calculate target Q values using the target network.
tgt_q_values = target_network(nxt_obs_batch)
max_tgt_q_values = tf.reduce_max(tgt_q_values, axis=-1)
y_batch = rew_batch + GAMMA*(1-dones_batch)*max_tgt_q_values
# Calculate loss between predicted and target Q values.
loss = tf.keras.losses.mean_squared_error(y_batch, pred_actions)
grads = tape.gradient(loss, model.trainable_variables)
opt.apply_gradients(zip(grads, model.trainable_variables))
update_target()
# Update our counters and record statistics periodically.
current_step += 1
episode_length += 1
if done or episode_length == MAX_EPISODE_LENGTH:
episode_count += 1
writer.add_scalar("Episode length", episode_length, global_step=episode_count)
writer.add_scalar("Reward per episode", total_reward, global_step=episode_count)
print(f'Ep {episode_count}: Step {current_step}, Episode Length={episode_length}, Total Reward={total_reward}')
break
observation = new_observation
def test(env_: gym.Env, render: bool = False):
"""Test the trained agent."""
observation = env_.reset()
total_reward = 0
episode_length = 0
while True:
if render:
env_.render()
observation = np.expand_dims(observation, axis=0).astype('float32')
action = np.argmax(model(observation)[0])
observation, reward, done, info = env_.step(action)
total_reward += reward
episode_length += 1
if done or episode_length == MAX_TESTING_EPISODE_LENGTH:
print(f'Testing Episode Length={episode_length}, Total Reward={total_reward}')
break
if not isinstance(env_.action_space, gym.spaces.Discrete):
action = list(map(lambda x: round(x, 2), action))
observation = observation.tolist()[0] if isinstance(observation, list) else observation
if __name__ == '__main__':
for ep in range(1000):
train(ep, env)
test(env)
writer.close()
env.close()
