【Python】SUMO + Python 联合仿真平台
第五部分:集成强化学习算法进行交通优化
强化学习的核心思想是让智能体 (Agent) 通过与环境 (Environment) 的交互来学习如何做出最优决策,以最大化累积奖励 (Cumulative Reward)。在交通场景中,智能体可以是交通信号控制器、自动驾驶车辆或交通管理中心,环境则是 SUMO 模拟的交通路网,奖励则与交通效率、安全、环保等目标相关。
5.1 强化学习与交通优化的契合点
交通系统本质上是动态的、随机的、高度复杂的系统,传统的基于规则或模型的优化方法在面对不断变化的需求和突发事件时,往往显得力不从心。强化学习的优势在于:
- 自适应性 (Adaptability) :RL 智能体能够通过不断试错和学习,适应不同的交通状况和环境变化,而无需显式地对所有可能情况进行建模。
- 处理复杂性 (Handling Complexity) :RL 可以处理高维度的状态空间和动作空间,适用于复杂的交通网络和多目标优化问题。
- 数据驱动决策 (Data-Driven Decisions) :RL 从与环境的交互数据中学习策略,可以发现传统方法难以察觉的优化模式。
- 长期优化 (Long-term Optimization) :RL 的目标是最大化长期累积奖励,能够学习具有远见的控制策略,而不仅仅是贪婪地优化当前状态。
在交通领域,强化学习已经被广泛应用于:
- 自适应交通信号控制 (Adaptive Traffic Signal Control, ATSC) :根据实时车流动态调整信号灯配时,减少延误,提高交叉口通行能力。
- 匝道汇入控制 (Ramp Metering) :控制车辆进入高速公路的速率,缓解主路拥堵。
- 动态路径诱导 (Dynamic Route Guidance) :为车辆提供最优路径建议,均衡路网负载。
- 自动驾驶车辆决策 (Autonomous Vehicle Decision Making) :例如换道、超车、路口通行等行为决策。
- 车队协同控制 (Platoon Coordination) :优化车队的行驶速度和间距,提高道路容量。
5.2 强化学习基础回顾
为了更好地理解后续内容,我们简要回顾一下强化学习的核心概念:
-
智能体 (Agent) :学习者和决策者。
-
环境 (Environment) :智能体所处的外部世界,与智能体交互。
-
状态 (State, S) :对环境在某一时刻的描述。例如,在交通信号控制中,状态可以是各个进口道的排队长度、当前信号相位等。
-
动作 (Action, A) :智能体可以执行的操作。例如,选择下一个绿灯相位,或者保持当前相位。
-
奖励 (Reward, R) :环境对智能体在某个状态下执行某个动作后反馈的标量信号,评价该动作的好坏。例如,负的车辆总延误、负的排队长度等。
-
策略 (Policy, π) :智能体从状态到动作的映射,即在给定状态下选择某个动作的规则或概率分布。π(a|s) = P[A_t = a | S_t = s]。
-
价值函数 (Value Function) :
- 状态价值函数 (State-Value Function, V π(s)): 从状态 s 开始,遵循策略 π 能获得的期望累积奖励。
Vπ(s) = Eπ[∑k=0∞ γk Rt+k+1 | St = s] - 动作价值函数 (Action-Value Function, Q π(s, a)): 在状态 s 执行动作 a后,继续遵循策略 π 能获得的期望累积奖励。
Qπ(s, a) = Eπ[∑k=0∞ γk Rt+k+1 | St = s, At = a]
其中 γ (0 ≤ γ ≤ 1) 是折扣因子,表示未来奖励相对于当前奖励的重要性。
- 状态价值函数 (State-Value Function, V π(s)): 从状态 s 开始,遵循策略 π 能获得的期望累积奖励。
-
马尔可夫决策过程 (Markov Decision Process, MDP) :强化学习问题通常被建模为 MDP,其核心是状态具有马尔可夫性,即未来状态只依赖于当前状态和动作,与历史状态无关。
-
探索与利用 (Exploration vs. Exploitation) :
- 利用 (Exploitation) :选择当前已知的能够带来最大奖励的动作。
- 探索 (Exploration) :尝试新的动作,以发现可能更优的策略,即使这些动作当前看起来不是最优的。
在学习过程中需要平衡这两者。
5.3 在 SUMO 中集成强化学习的通用框架
将强化学习算法应用于 SUMO 仿真进行交通优化,通常遵循以下框架:
定义强化学习问题 :
* **智能体 (Agent)** : 通常是 Python 脚本中实现的 RL 算法。
* **环境 (Environment)** : SUMO 仿真实例。TraCI 作为智能体与 SUMO 环境交互的桥梁。
核心交互循环 :
* **步骤 1: 状态获取 (State Observation)** * Python 脚本通过 TraCI 从 SUMO 中获取当前交通状态。
* 这些状态信息需要被构造成 RL 智能体能够理解的形式(状态向量或张量)。
* 例如:`traci.lanearea.getJamLengthVehicle("detector_id")` 获取检测器覆盖区域的排队车辆数,`traci.trafficlight.getPhase("tls_id")` 获取信号灯当前相位。
* **步骤 2: 动作选择 (Action Selection)** * RL 智能体根据当前观察到的状态和其学习到的策略,选择一个动作。
* 对于初学者,可能会使用如 ε-greedy 策略来平衡探索和利用。
* **步骤 3: 动作执行 (Action Execution)** * Python 脚本通过 TraCI 将智能体选择的动作应用到 SUMO 环境中。
* 例如:`traci.trafficlight.setPhase("tls_id", phase_index)` 设置信号灯的新相位。
* **步骤 4: 环境演化与奖励计算 (Environment Step & Reward Calculation)** * 调用 `traci.simulationStep()` 使 SUMO 仿真向前推进一个或多个时间步。
* 在新的仿真步之后,计算由于上一个动作导致的奖励。奖励函数的设计至关重要,它直接引导智能体的学习方向。
* 例如,奖励可以是负的累积车辆等待时间变化量,或者负的交叉口总排队长度。
* **步骤 5: 智能体学习/更新 (Agent Learning/Update)** * 智能体使用 (当前状态, 动作, 奖励, 下一状态) 这个经验元组来更新其内部模型或策略。
* 例如,在 Q-learning 中,更新 Q 表;在 DQN 中,更新神经网络的权重。
* **步骤 6: 循环/终止 (Loop/Termination)** * 重复步骤 1-5,直到达到预设的训练轮次 (episodes) 或仿真时间,或者满足某个终止条件。
关键设计要素 :
* **状态表示 (State Representation)** :
* 如何将 SUMO 中的原始交通数据转换为有意义且对 RL 算法有效的状态特征。
* 需要包含足够的信息以做出明智决策,但也要避免维度灾难。
* 常见的状态特征包括:各个进口道的排队长度、车辆密度、平均速度、当前信号相位、已持续时间等。
* **动作空间 (Action Space)** :
* 定义智能体可以执行的操作集合。
* 对于交通信号控制:
* 离散动作:选择下一个要切换到的信号相位。
* 参数化动作:决定当前绿灯相位的持续时间。
* 对于车辆控制:改变速度、换道、选择路径。
* **奖励函数 (Reward Function Design)** :
* 这是 RL 应用中最具挑战性也最关键的部分之一。
* 奖励函数必须准确反映优化目标。
* 例如:
* 最小化延误:`reward = -total_delay` 或 `reward = previous_total_delay - current_total_delay`。
* 最小化排队长度:`reward = -total_queue_length`。
* 最大化吞吐量:`reward = number_of_vehicles_passed`。
* 组合奖励:`reward = w1 * (-delay) + w2 * (-queue_length) + w3 * (throughput)`。
* 需要仔细设计以避免智能体学到非预期的"捷径"行为。
5.4 案例:基于 Q-learning 的自适应交通信号控制
Q-learning 是一种经典的、基于价值的、离策略 (off-policy) 的强化学习算法。它学习一个动作价值函数 Q(s, a),表示在状态 s 下执行动作 a 后能够获得的期望累积回报。
5.4.1 问题定义
- 目标 : 控制单个交叉口的交通信号灯,以最小化车辆的平均等待时间或总排队长度。
- 环境 : SUMO 模拟的单个交叉口。
- 智能体 : Q-learning 算法。
5.4.2 状态表示 (State Representation)
为了简化,我们采用离散化的状态表示。假设一个四路交叉口,每个进口道有一个主要的直行/左转相位组合。
- 例如,可以将每个进口道的排队长度离散化为几个等级(如:0-5辆车为等级0,6-10辆车为等级1,11-15辆车为等级2,>15辆车为等级3)。
- 当前有效的信号相位也可以作为状态的一部分。
假设有四个进口道(北N, 南S, 东E, 西W),每个进口道排队长度有 L 个等级。
如果只考虑排队长度,状态可以是 (q_N, q_S, q_E, q_W),其中 q_i 是第 i 个进口道的排队长度等级。
状态空间大小为 L4。如果再加上当前信号相位(假设有 P 个相位),则状态空间大小为 L4 * P。
示例:简化状态
为了代码演示,我们可以将状态简化为更易于管理的形式。例如,对于一个简单的十字路口,我们可以定义状态为代表各主要方向拥堵情况的元组。
假设交叉口 tls_id = "J1"。
我们将定义4个相位(例如):
- Phase 0: 南北直行绿灯 (NS_Green)
- Phase 1: 南北左转绿灯 (NSL_Green) (如果需要)
- Phase 2: 东西直行绿灯 (EW_Green)
- Phase 3: 东西左转绿灯 (EWL_Green) (如果需要)
为了简化,假设我们只控制两个主相位:NS 向绿灯,EW 向绿灯。
- 状态 (State) :我们可以将每个方向(N, S, E, W)进入交叉口前一定距离内的车辆数作为状态的一部分。
例如,状态可以定义为(num_veh_N, num_veh_S, num_veh_E, num_veh_W, current_phase_index)。
为了使 Q 表可管理,这些车辆数需要被离散化。
例如,0-5 辆车 -> 0, 6-10 辆车 -> 1, >10 辆车 -> 2。
如果current_phase_index有2个值 (0 for NS_Green, 1 for EW_Green)。
那么状态空间大小是3 * 3 * 3 * 3 * 2 = 162个状态。这对于 Q-table 来说是可行的。
5.4.3 动作空间 (Action Space)
- 动作 : 智能体在每个决策时刻选择下一个要激活的信号相位。
- 对于我们的双相位示例:
- 动作 0: 切换到/保持 NS_Green 相位。
- 动作 1: 切换到/保持 EW_Green 相位。
5.4.4 奖励函数 (Reward Function)
- 目标 : 减少排队。
- 奖励 : 可以是负的当前交叉口总排队长度。
reward = - (total_queue_length_N + total_queue_length_S + total_queue_length_E + total_queue_length_W)
或者,奖励可以是切换前后排队长度的变化:
reward = previous_total_queue - current_total_queue
5.4.5 Q-learning 算法
- Q 表 : 一个表格,存储每个 (状态, 动作) 对的 Q 值,
Q[state][action]。 - 学习率 (Learning Rate, α) : 控制新学习到的信息在多大程度上覆盖旧信息。
- 折扣因子 (Discount Factor, γ) :衡量未来奖励的重要性。
- 探索率 (Exploration Rate, ε) : 在 ε-greedy 策略中,以 ε 的概率随机选择动作(探索),以 1-ε 的概率选择当前 Q 值最大的动作(利用)。
Q 值更新规则 :
当智能体从状态 s 执行动作 a,得到奖励 r,并转移到新状态 s' 时,Q 值的更新如下:
Q(s, a) ← Q(s, a) + α * [r + γ * max<sub>a'</sub>(Q(s', a')) - Q(s, a)]
5.4.6 Python + TraCI 实现 Q-learning 交通信号控制 (概念性代码)
假设我们已经配置好了 SUMO 场景 (.sumocfg 文件),其中包含一个名为 "J1" 的交叉口,以及相应的检测器来获取排队长度。
import traci
import sumolib # 用于解析网络等
import numpy as np
import random
import os
import sys
# SUMO 环境变量配置 (如果需要)
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
# Q-learning 参数
ALPHA = 0.1 # 学习率
GAMMA = 0.9 # 折扣因子
EPSILON_START = 1.0 # 初始探索率
EPSILON_END = 0.05 # 最终探索率
EPSILON_DECAY = 0.995 # 探索率衰减因子
NUM_EPISODES = 1000 # 训练的总轮次数
SIMULATION_STEPS_PER_EPISODE = 3600 # 每轮仿真的步数
ACTION_DURATION = 10 # 每个选择的相位至少持续10个仿真步
# 交叉口和相位信息
TLS_ID = "J1" # 你的交通灯ID
# 假设我们只有两个主要相位: 0 (NS绿灯), 1 (EW绿灯)
# 实际中需要根据你的 .net.xml 文件中的 <tlLogic> 定义来确定
# 例如,相位0可能对应 traci.trafficlight.getCompleteRedYellowGreenDefinition(TLS_ID)[0].phases[0]
# 这里简化为相位索引
PHASE_NS_GREEN_INDEX = 0 # 假设南北绿灯是程序中的第0个相位
PHASE_EW_GREEN_INDEX = 2 # 假设东西绿灯是程序中的第2个相位 (跳过黄灯)
# 确保这些索引与你的信号灯定义匹配,或者你可以定义一个更详细的相位列表
# 例如 programID = traci.trafficlight.getProgram(TLS_ID)
# phases = traci.trafficlight.getCompleteRedYellowGreenDefinition(TLS_ID)[0].phases
# green_phases_indices = [i for i, p in enumerate(phases) if 'G' in p.state or 'g' in p.state]
# 为了简单,我们直接指定动作就是切换到这两个核心相位
ACTIONS = [PHASE_NS_GREEN_INDEX, PHASE_EW_GREEN_INDEX] # 可选动作:激活NS绿灯或EW绿灯
NUM_ACTIONS = len(ACTIONS)
# 状态离散化
# 假设有4条入口道,每条道的检测器ID
# 例如: "det_N_J1_0", "det_S_J1_0", "det_E_J1_0", "det_W_J1_0"
INCOMING_LANES_DETECTORS = {
"N": ["lane_N_in_0"], # 假设北向进口道上的检测器对应车道 lane_N_in_0
"S": ["lane_S_in_0"], # 南向
"E": ["lane_E_in_0"], # 东向
"W": ["lane_W_in_0"] # 西向
}
# 实际应用中,检测器通常是 e1Detector 或 e2Detector (lane area detector)
# 这里用 getLaneIDList 获取车道,再用 getLastStepVehicleNumber 获取车道上的车辆数作为简化队列
# 或者使用 traci.lanearea.getJamLengthVehicle() 如果定义了 laneAreaDetector
MAX_CARS_PER_LANE_STATE = 3 # 离散化等级: 0 (0-5), 1 (6-10), 2 (>10)
# Q 表初始化
# 状态: (q_N, q_S, q_E, q_W, current_phase_logic_index)
# current_phase_logic_index: 0 for NS, 1 for EW (代表当前哪个方向是绿灯)
# q_N, q_S, q_E, q_W 的取值范围是 0, 1, 2
# current_phase_logic_index 的取值范围是 0, 1
q_table = np.zeros((MAX_CARS_PER_LANE_STATE, MAX_CARS_PER_LANE_STATE,
MAX_CARS_PER_LANE_STATE, MAX_CARS_PER_LANE_STATE,
len(ACTIONS), # 代表当前是哪个主要流向的绿灯
NUM_ACTIONS)) # Q(s,a)
def get_sumo_executable(gui=False):
"""获取SUMO或SUMO-GUI的可执行文件路径"""
if gui:
return "sumo-gui" # 返回SUMO GUI的可执行文件名
return "sumo" # 返回SUMO命令行版本的可执行文件名
def start_simulation(sumocfg_file, gui=False):
"""启动SUMO仿真"""
sumo_cmd = [get_sumo_executable(gui), "-c", sumocfg_file] # 构建SUMO启动命令
sumo_cmd.extend(["--step-length", "1"]) # 设置仿真步长为1秒
sumo_cmd.extend(["--remote-port", "8813"]) # 指定TraCI端口
sumo_cmd.extend(["--waiting-time-memory", "1000"]) # 增加车辆等待时间记忆长度,用于获取准确的累计等待时间
# sumo_cmd.extend(["--time-to-teleport", "-1"]) # 防止车辆瞬移,让拥堵更真实
traci.start(sumo_cmd, port=8813) # 启动SUMO并通过TraCI连接
print(f"SUMO started with command: {' '.join(sumo_cmd)}") # 打印启动命令
def get_lane_queue_length(lane_id):
"""获取指定车道的排队车辆数 (简化版,实际中用检测器更佳)"""
# 这是一个非常简化的版本,实际中应该用检测器
# 例如 traci.lanearea.getJamLengthVehicle(detector_id) 或 traci.edge.getLastStepHaltingNumber(edge_id)
# 这里我们用 traci.lane.getLastStepHaltingNumber(lane_id)
try:
return traci.lane.getLastStepHaltingNumber(lane_id) # 获取车道上一步的停止车辆数
except traci.TraCIException:
print(f"Warning: Could not get halting number for lane {lane_id}") # 打印警告信息
return 0 # 返回0,如果获取失败
def discretize_queue(queue_length):
"""将排队长度离散化"""
if queue_length <= 5: # 如果排队长度小于等于5
return 0 # 返回离散状态0
elif queue_length <= 10: # 如果排队长度小于等于10
return 1 # 返回离散状态1
else: # 其他情况
return 2 # 返回离散状态2
def get_state():
"""从SUMO获取当前状态并离散化"""
queues = [] # 初始化队列列表
for direction_lanes in INCOMING_LANES_DETECTORS.values(): # 遍历所有方向的车道
direction_queue = 0 # 初始化方向队列长度
for lane_id in direction_lanes: # 遍历该方向的所有车道
direction_queue += get_lane_queue_length(lane_id) # 累加车道队列长度
queues.append(discretize_queue(direction_queue)) # 将离散化后的队列长度添加到列表
# 获取当前信号灯相位逻辑 (哪个主要方向是绿灯)
# 我们需要将实际的相位索引映射到我们定义的简化逻辑索引 (0 for NS, 1 for EW)
current_sumo_phase = traci.trafficlight.getPhase(TLS_ID) # 获取当前SUMO相位索引
# 这个映射逻辑非常重要,需要根据你的信号灯定义来确定
# 假设 PHASE_NS_GREEN_INDEX (e.g., 0) 对应我们的逻辑状态 0
# 假设 PHASE_EW_GREEN_INDEX (e.g., 2) 对应我们的逻辑状态 1
# 其他黄灯、红灯相位,我们需要判断它们之前是哪个绿灯相位
# 这是一个简化逻辑,实际中可能需要更复杂的判断或记录上一个绿灯相位
# 为了简单,我们直接判断当前相位是否是我们定义的两个主绿灯相位之一
current_phase_logic_index = 0 # 默认为NS方向绿灯的逻辑索引
if current_sumo_phase == PHASE_EW_GREEN_INDEX: # 如果当前SUMO相位是东西绿灯
current_phase_logic_index = 1 # 设置逻辑索引为1 (EW)
elif current_sumo_phase == PHASE_NS_GREEN_INDEX: # 如果当前SUMO相位是南北绿灯
current_phase_logic_index = 0 # 设置逻辑索引为0 (NS)
else:
# 如果是黄灯或全红,我们可能需要知道它从哪个绿灯转换而来
# 为了简化,如果不是明确的NS或EW绿灯,我们可能基于上一个动作或一个默认值
# 这里我们先假设它保持上一个逻辑状态,或者需要一个全局变量来跟踪
# 在这个例子中,我们期望动作总是切换到明确的NS绿灯或EW绿灯之一
# 如果是黄灯,它通常属于前一个绿灯阶段的延续。
# 更鲁棒的方法是记录上一个 chosen_action (代表的逻辑相位)
# 这里我们尝试从 traci.trafficlight.getProgram(TLS_ID) 和 traci.trafficlight.getPhase(TLS_ID)
# 来推断当前是哪个方向的绿灯亮起。
# 对于Q-learning,状态定义需要稳定。
# 假设我们的动作直接设定到 NS_GREEN 或 EW_GREEN, 黄灯是过渡
# 那么 current_phase_logic_index 可以反映当前哪个方向 *应该* 是绿灯
# 或者我们可以更简单地,将当前实际的SUMO相位索引作为状态的一部分,
# 但这会增加状态空间的维度。
# 此处的简化:如果不是我们关心的两个主绿灯,就看上一次的动作是哪个。
# 为了Q-table的索引,这个current_phase_logic_index必须是0或1.
# 我们假设动作执行后,相位会变成我们期望的主绿灯相位之一
# 所以,在get_state()时,这个值反映了*当前*哪个主要方向是绿灯。
# 这个逻辑需要根据实际的信号灯配置来精确化。
# 例如,如果你的相位0是NS绿,相位1是NS黄,相位2是EW绿,相位3是EW黄
# 当 current_sumo_phase 是 0 或 1 时,逻辑上是NS绿。
# 当 current_sumo_phase 是 2 或 3 时,逻辑上是EW绿。
# 在这个示例中,我们之前定义了ACTIONS = [PHASE_NS_GREEN_INDEX, PHASE_EW_GREEN_INDEX]
# 所以,如果当前相位是 PHASE_NS_GREEN_INDEX, 我们的逻辑相位就是0
# 如果当前相位是 PHASE_EW_GREEN_INDEX, 我们的逻辑相位就是1
# 如果是黄灯,它属于前一个绿灯。
# traci.trafficlight.getPhase(TLS_ID) 返回的是程序中的绝对相位索引
program_logic = traci.trafficlight.getCompleteRedYellowGreenDefinition(TLS_ID)[0] # 获取当前程序逻辑
current_phase_definition = program_logic.phases[current_sumo_phase].state # 获取当前相位的灯色字符串
# 一个更可靠的方法是检查当前相位定义中哪个方向是绿灯
# 例如,如果 'G' 或 'g' 出现在代表南北向的灯位上
# 但这依赖于灯色字符串的复杂解析。
# 简化:如果当前是南北绿,phase_logic_index = 0, 如果是东西绿,则为 1
# 黄灯阶段通常意味着前一个绿灯阶段的结束。
# 为了Q-table索引,我们需要一个明确的逻辑状态。
# 假设我们总是从一个绿灯相位切换到另一个绿灯相位(中间有黄灯)
# state 中的 current_phase_logic_index 代表“哪个方向刚刚或正在经历绿灯”
# 如果 traci.trafficlight.getPhase(TLS_ID) 是 PHASE_NS_GREEN_INDEX -> 0
# 如果 traci.trafficlight.getPhase(TLS_ID) 是 PHASE_EW_GREEN_INDEX -> 1
# 如果是黄灯,它紧随绿灯之后。
# 这个state的 current_phase_logic_index 代表上一个或当前的主要绿灯方向。
# 更好的做法可能是,这个状态元素代表“即将为哪个方向开绿灯的决策”已做出,
# 或者直接使用上一个动作作为状态的一部分。
# 为了Q-table的稳定,我们假设 current_phase_logic_index 代表当前 *主要* 的绿灯方向
# 如果当前是黄灯,它通常指示着从哪个绿灯相位转换而来。
# e.g. if current_sumo_phase is yellow after NS_GREEN, logic is still NS (0)
# 为了简单,我们假设动作执行后,相位会变成我们期望的主绿灯相位之一
# 然后我们才获取状态。所以这里应该反映的是已设置的绿灯。
# 实际中,RL的状态需要精确定义。
# 让我们假设一个全局变量 `last_chosen_logic_phase` 来记录上一个选择的逻辑相位
# 并在动作执行后更新它。
# state_tuple = tuple(queues + [last_chosen_logic_phase]) # 拼接成元组作为状态
# 另一种方式:直接从当前相位判断。
# 假设 phase 0, 1 是 NS (绿,黄),phase 2, 3 是 EW (绿,黄)
# if current_sumo_phase in [0, 1]: current_phase_logic_index = 0
# elif current_sumo_phase in [2, 3]: current_phase_logic_index = 1
# 这个需要根据你的 `TLS_ID` 的具体相位定义来确定。
# 对于本例,我们直接使用我们定义的 `ACTIONS` 列表中的索引。
# 如果当前相位是ACTIONS[0] (PHASE_NS_GREEN_INDEX),则逻辑相位为0.
# 如果当前相位是ACTIONS[1] (PHASE_EW_GREEN_INDEX),则逻辑相位为1.
# 黄灯阶段较为复杂,它通常与前一个绿灯相位关联。
# 为了Q表查找,这个逻辑索引必须是确定的。
# 我们假设状态是在“绿灯”期间或即将进入绿灯时评估的。
# 这里的 current_phase_logic_index 指的是当前哪个 *逻辑* 方向是绿灯。
# 0 代表南北,1 代表东西。
active_program_id = traci.trafficlight.getProgram(TLS_ID) # 获取当前活动的程序ID
all_logics = traci.trafficlight.getAllProgramLogics() # 获取所有程序逻辑
current_logic = None # 初始化当前逻辑
for logic in all_logics: # 遍历所有逻辑
if logic.programID == active_program_id: # 如果找到当前活动的程序ID
current_logic = logic # 设置为当前逻辑
break # 跳出循环
if current_logic: # 如果找到了当前逻辑
# 假设 PHASE_NS_GREEN_INDEX 是南北绿灯相位在当前 program 中的索引
# 假设 PHASE_EW_GREEN_INDEX 是东西绿灯相位在当前 program 中的索引
# 我们需要判断当前相位 `current_sumo_phase` 属于哪个逻辑方向。
# 例如,如果 `current_sumo_phase` 等于我们定义的南北绿灯相位索引,
# 或者与南北绿灯相关联的黄灯相位索引。
# 这是一个困难点,因为SUMO的相位索引是绝对的,而我们的逻辑相位是抽象的。
# 简化:我们假设 `last_action_logic_index` 是一个全局变量,在执行动作后更新。
# `state = tuple(queues + [last_action_logic_index])`
# 为避免引入全局变量,我们尝试从当前相位推断。
# 如果当前相位是`PHASE_NS_GREEN_INDEX`,则逻辑相位为0。
# 如果当前相位是`PHASE_EW_GREEN_INDEX`,则逻辑相位为1。
# 对于黄灯,它属于前一个绿灯。
# 让我们用一个更简单的状态,只基于队列。
# 并且,当前相位信息由Q-table的结构隐式处理,或者我们让动作决定下一个相位,
# 而不是将当前相位作为状态的一部分。
# 如果动作是“切换到NS”或“切换到EW”,那么当前相位信息可能不是那么重要了,
# 因为Q值是 Q(s, a),其中a已经是决策。
# 让我们暂时移除 current_phase_logic_index from state for simplification of Q-table,
# and make the Q-table Q[q_N][q_S][q_E][q_W][action_index]
# q_table = np.zeros((MAX_CARS_PER_LANE_STATE, ..., NUM_ACTIONS))
# 但是标准的Q-learning是 Q(s,a),所以状态中包含当前相位通常是必要的。
# 重新考虑:state 中包含当前哪个 *主要方向* 是绿灯。
# 0 for NS, 1 for EW.
# 这个映射需要你根据自己的 .net.xml 中的 <tlLogic> 定义。
# e.g. if phases 0,1,2 are for NS (G, y, G_ped), and 3,4,5 for EW (G,y,G_ped)
# then if current_sumo_phase in [0,1,2], current_phase_logic_index = 0
# else current_phase_logic_index = 1
# 为了本代码的通用性,我们假设一个函数 `get_current_logic_phase_index(current_sumo_phase)`
# 在这里我们硬编码一个简单的映射,假设PHASE_NS_GREEN_INDEX和相关的黄灯属于逻辑0,
# PHASE_EW_GREEN_INDEX和相关的黄灯属于逻辑1。
# 假设黄灯相位紧随绿灯相位。
# if current_sumo_phase == PHASE_NS_GREEN_INDEX or current_sumo_phase == PHASE_NS_GREEN_INDEX + 1:
# current_phase_logic_index = 0
# elif current_sumo_phase == PHASE_EW_GREEN_INDEX or current_sumo_phase == PHASE_EW_GREEN_INDEX + 1:
# current_phase_logic_index = 1
# 这个逻辑需要用户根据自己的信号灯定义来适配。
# 我们将假设 `current_phase_logic_index` 来源于上一次执行的动作的意图。
# 或者,更简单,我们直接用当前SUMO相位模除一个基数(如果相位有规律)
# 为了这个例子,我们将使用一个全局变量来跟踪上一个选择的逻辑相位。
global last_selected_logic_phase_index # 声明使用全局变量
state_tuple = tuple(queues + [last_selected_logic_phase_index]) # 组成状态元组
return state_tuple # 返回状态元组
return tuple([0]*len(INCOMING_LANES_DETECTORS) + [0]) # 如果无法获取逻辑,返回默认状态
def get_reward(current_queues_raw):
"""计算奖励,负的当前总排队长度"""
# current_queues_raw 是一个包含各方向原始车辆数的列表或元组
return -sum(current_queues_raw) # 返回负的总排队长度
def choose_action(state, epsilon):
"""使用epsilon-greedy策略选择动作"""
if random.random() < epsilon: # 如果随机数小于epsilon
return random.choice(range(NUM_ACTIONS)) # 随机选择一个动作 (探索)
else: # 否则
# state是一个元组 (qN, qS, qE, qW, current_phase_logic_idx)
# q_table[state] 会因为 state 是元组而直接索引
return np.argmax(q_table[state]) # 选择Q值最大的动作 (利用)
def run_episode(episode_num, current_epsilon, sumocfg_file, gui=False):
"""一个训练轮次"""
global last_selected_logic_phase_index # 声明使用全局变量
start_simulation(sumocfg_file, gui=gui) # 启动仿真
# 初始设置一个默认的逻辑相位,例如南北绿灯
last_selected_logic_phase_index = 0 # 假设初始是南北绿灯逻辑
traci.trafficlight.setPhase(TLS_ID, ACTIONS[last_selected_logic_phase_index]) # 设置初始相位
# 为了获取初始状态,先几步让交通流稳定一下,并应用初始相位
for _ in range(5): # 5个仿真步
traci.simulationStep() # 执行一步仿真
current_state = get_state() # 获取初始状态
total_reward_this_episode = 0 # 初始化本轮总奖励
# 为了计算奖励,我们需要未离散化的队列长度
raw_queues_for_reward = [] # 初始化原始队列长度列表
for direction_lanes in INCOMING_LANES_DETECTORS.values(): # 遍历所有方向的车道
direction_queue = 0 # 初始化方向队列长度
for lane_id in direction_lanes: # 遍历该方向所有车道
direction_queue += get_lane_queue_length(lane_id) # 累加车道队列长度
raw_queues_for_reward.append(direction_queue) # 添加到原始队列列表
simulation_time = 0 # 初始化仿真时间
while simulation_time < SIMULATION_STEPS_PER_EPISODE: # 当仿真时间小于每轮最大步数
action_logic_index = choose_action(current_state, current_epsilon) # 选择动作(逻辑索引 0 或 1)
chosen_sumo_phase = ACTIONS[action_logic_index] # 将逻辑动作索引映射到SUMO相位索引
traci.trafficlight.setPhase(TLS_ID, chosen_sumo_phase) # 应用选择的SUMO相位
last_selected_logic_phase_index = action_logic_index # 更新上一个选择的逻辑相位索引
# 让选定的相位持续一段时间
for _ in range(ACTION_DURATION): # 循环相位持续时间
if simulation_time >= SIMULATION_STEPS_PER_EPISODE: # 如果达到最大步数
break # 跳出循环
traci.simulationStep() # 执行一步仿真
simulation_time += 1 # 仿真时间加1
if simulation_time >= SIMULATION_STEPS_PER_EPISODE: # 再次检查是否达到最大步数
# 在结束前获取最后的状态和奖励
next_raw_queues_for_reward = [] # 初始化下一个原始队列长度列表
for direction_lanes in INCOMING_LANES_DETECTORS.values(): # 遍历所有方向的车道
direction_queue = 0 # 初始化方向队列长度
for lane_id in direction_lanes: # 遍历该方向所有车道
direction_queue += get_lane_queue_length(lane_id) # 累加车道队列长度
next_raw_queues_for_reward.append(direction_queue) # 添加到下一个原始队列列表
reward = get_reward(next_raw_queues_for_reward) # 计算奖励
next_state = get_state() # 获取下一个状态(虽然可能不会用于更新,因为是episode末尾)
break # 结束循环
next_raw_queues_for_reward = [] # 初始化下一个原始队列长度列表
for direction_lanes in INCOMING_LANES_DETECTORS.values(): # 遍历所有方向的车道
direction_queue = 0 # 初始化方向队列长度
for lane_id in direction_lanes: # 遍历该方向所有车道
direction_queue += get_lane_queue_length(lane_id) # 累加车道队列长度
next_raw_queues_for_reward.append(direction_queue) # 添加到下一个原始队列列表
reward = get_reward(next_raw_queues_for_reward) # 计算奖励
next_state = get_state() # 获取下一个状态
# Q-table 更新
# current_state 是一个元组,可以直接用作q_table的索引
# action_logic_index 是选择的动作索引 (0 或 1)
best_next_action = np.argmax(q_table[next_state]) # 找到下一个状态下Q值最大的动作
td_target = reward + GAMMA * q_table[next_state][best_next_action] # 计算TD目标值
td_delta = td_target - q_table[current_state][action_logic_index] # 计算TD误差
q_table[current_state][action_logic_index] += ALPHA * td_delta # 更新Q值
current_state = next_state # 更新当前状态
raw_queues_for_reward = next_raw_queues_for_reward # 更新原始队列长度
total_reward_this_episode += reward # 累加奖励
traci.close() # 关闭TraCI连接
print(f"Episode {episode_num + 1}: Total Reward: {total_reward_this_episode}, Epsilon: {current_epsilon:.3f}") # 打印轮次信息
return total_reward_this_episode # 返回本轮总奖励
# ---- 全局变量,用于 get_state ----
# 这个全局变量的引入是为了简化状态表示中“当前逻辑相位”的获取。
# 在更复杂的实现中,这部分状态可以从历史动作或者对SUMO相位的详细解析中得到。
last_selected_logic_phase_index = 0 # 初始化为0 (例如,代表NS绿灯逻辑)
# 主训练循环
if __name__ == '__main__':
sumocfg_file_path = "your_scenario.sumocfg" # 指定你的SUMO配置文件路径
# 请确保 "your_scenario.sumocfg" 文件存在,并且包含名为 "J1" 的交通灯
# 以及在 INCOMING_LANES_DETECTORS 中定义的车道
# 并且交通灯 "J1" 的程序中,相位 ACTIONS[0] 和 ACTIONS[1] 是有效的绿灯相位。
# 示例:创建一个简单的SUMO场景文件(如果它们不存在)
# 为了能,你需要一个最基本的 .net.xml, .rou.xml, .sumocfg
# network_file = "rl_intersection.net.xml"
# route_file = "rl_routes.rou.xml"
# config_file = "rl_config.sumocfg"
#
# if not os.path.exists(network_file):
# with open(network_file, "w") as f:
# f.write("""<network>
# <node id="J1" x="0.0" y="0.0" type="traffic_light"/>
# <node id="N1" x="0.0" y="100.0" type="priority"/>
# <node id="S1" x="0.0" y="-100.0" type="priority"/>
# <node id="E1" x="100.0" y="0.0" type="priority"/>
# <node id="W1" x="-100.0" y="0.0" type="priority"/>
#
# <edge id="N_in" from="N1" to="J1" priority="1">
# <lane id="lane_N_in_0" index="0" speed="13.89" length="100.0" shape="0.0,95.0 0.0,5.0"/>
# </edge>
# <edge id="S_in" from="S1" to="J1" priority="1">
# <lane id="lane_S_in_0" index="0" speed="13.89" length="100.0" shape="0.0,-95.0 0.0,-5.0"/>
# </edge>
# <edge id="E_in" from="E1" to="J1" priority="1">
# <lane id="lane_E_in_0" index="0" speed="13.89" length="100.0" shape="95.0,0.0 5.0,0.0"/>
# </edge>
# <edge id="W_in" from="W1" to="J1" priority="1">
# <lane id="lane_W_in_0" index="0" speed="13.89" length="100.0" shape="-95.0,0.0 -5.0,0.0"/>
# </edge>
#
# <edge id="out_N" from="J1" to="N1" priority="1"/>
# <edge id="out_S" from="J1" to="S1" priority="1"/>
# <edge id="out_E" from="J1" to="E1" priority="1"/>
# <edge id="out_W" from="J1" to="W1" priority="1"/>
#
# <tlLogic id="J1" type="static" programID="0" offset="0">
# <phase duration="30" state="GrGr"/> <!-- NS Green (Incorrect, should be e.g. GGrrGGrr for 4 approaches) -->
# <phase duration="5" state="yryr"/> <!-- NS Yellow -->
# <phase duration="30" state="rGrG"/> <!-- EW Green -->
# <phase duration="5" state="ryry"/> <!-- EW Yellow -->
# </tlLogic>
# <!-- A more correct phase definition for a 4-arm intersection:
# J1 has incoming edges: E_in, N_in, S_in, W_in (order matters for 'state' string)
# Connections:
# E_in -> out_W, out_N, out_S
# N_in -> out_S, out_E, out_W
# S_in -> out_N, out_E, out_W
# W_in -> out_E, out_N, out_S
# Assuming standard 12-signal group (3 per approach: Left, Through, Right)
# State string: E_L, E_T, E_R, N_L, N_T, N_R, S_L, S_T, S_R, W_L, W_T, W_R
# Example state string for NS Green (N-S through, N-S Right, S-N Right):
# Phase 0 (NS Green): "rrgGgrrgGgrr" (assuming E,N,S,W order for approaches in TL definition)
# Phase 1 (NS Yellow): "rryygrryygrr"
# Phase 2 (EW Green): "GgrrGgrrggGG"
# Phase 3 (EW Yellow): "yyrryyrrggYY"
# The PHASE_NS_GREEN_INDEX and PHASE_EW_GREEN_INDEX must match these.
# For this example code, let's assume a simplified phase definition:
# <tlLogic id="J1" type="static" programID="0" offset="0">
# <phase duration="30" state="GGggrrrrGGgg"/> <!-- NS Green (index 0) -->
# <phase duration="4" state="yyyyrrrryyyy"/> <!-- NS Yellow (index 1) -->
# <phase duration="30" state="rrrrGGggrrrr"/> <!-- EW Green (index 2) -->
# <phase duration="4" state="rrrryyyyrrrr"/> <!-- EW Yellow (index 3) -->
# </tlLogic>
# Then PHASE_NS_GREEN_INDEX = 0, PHASE_EW_GREEN_INDEX = 2
# The `state` attribute in <phase> refers to the signal states for all links controlled by the TLS.
# The order of characters in `state` corresponds to the order of <connection> elements for that TLS in the .net.xml file
# OR, if `tlType="static"` and no connections specified, it's based on incoming lane order.
# It's CRUCIAL that ACTIONS[0] and ACTIONS[1] correctly set the desired green phases.
# The get_state() function's logic for `current_phase_logic_index` also depends on this.
#</network>""")
# print(f"Created dummy {network_file}")
#
# if not os.path.exists(route_file):
# with open(route_file, "w") as f:
# f.write("""<routes>
# <vType id="car" accel="2.6" decel="4.5" sigma="0.5" length="5" maxSpeed="13.89" />
# <flow id="flow_N_S" type="car" from="N_in" to="out_S" begin="0" end="3600" number="300"/>
# <flow id="flow_S_N" type="car" from="S_in" to="out_N" begin="0" end="3600" number="300"/>
# <flow id="flow_E_W" type="car" from="E_in" to="out_W" begin="0" end="3600" number="300"/>
# <flow id="flow_W_E" type="car" from="W_in" to="out_E" begin="0" end="3600" number="300"/>
# </routes>""")
# print(f"Created dummy {route_file}")
#
# if not os.path.exists(config_file):
# with open(config_file, "w") as f:
# f.write(f"""<configuration>
# <input>
# <net-file value="{network_file}"/>
# <route-files value="{route_file}"/>
# </input>
# <time>
# <begin value="0"/>
# <end value="4000"/>
# </time>
# </configuration>""")
# print(f"Created dummy {config_file}")
# sumocfg_file_path = config_file # 使用新创建的配置文件
# 确保你的配置文件路径是正确的
if not os.path.exists(sumocfg_file_path): # 如果配置文件不存在
print(f"Error: SUMO configuration file '{sumocfg_file_path}' not found.") # 打印错误信息
print("Please create the SUMO scenario files (e.g., .net.xml, .rou.xml, .sumocfg) first.") # 提示用户创建文件
print("The above commented out code can generate placeholder files, but you'll need to define") # 提示用户注意占位符文件
print("TLS 'J1' with phases corresponding to PHASE_NS_GREEN_INDEX and PHASE_EW_GREEN_INDEX.") # 提示用户定义交通灯和相位
sys.exit(1) # 退出程序
epsilon = EPSILON_START # 初始化epsilon
all_episode_rewards = [] # 初始化所有轮次奖励列表
for episode in range(NUM_EPISODES): # 循环指定的轮次数
# GUI 仅用于调试少数几个轮次
use_gui = False # 设置是否使用GUI
if episode % 100 == 0 or episode < 3 : # 每100轮或前3轮
# use_gui = True # 开启GUI(如果你想观察)
pass # 不开启GUI以加速训练
total_reward = run_episode(episode, epsilon, sumocfg_file_path, gui=use_gui) # 一个轮次
all_episode_rewards.append(total_reward) # 添加本轮奖励到列表
epsilon = max(EPSILON_END, epsilon * EPSILON_DECAY) # 更新epsilon,确保不低于最小值
print("Training finished.") # 打印训练完成信息
# 可以选择保存Q表
# np.save("q_table_traffic.npy", q_table)
# print("Q-table saved.")
# 绘制学习曲线 (可选)
try:
import matplotlib.pyplot as plt # 导入matplotlib库
plt.plot(all_episode_rewards) # 绘制奖励曲线
plt.xlabel("Episode") # 设置x轴标签
plt.ylabel("Total Reward") # 设置y轴标签
plt.title("Q-learning Training Progress for Traffic Signal Control") # 设置标题
plt.savefig("q_learning_rewards.png") # 保存图像
plt.show() # 显示图像
except ImportError:
print("Matplotlib not installed, skipping reward plot.") # 如果未安装matplotlib,则跳过绘图
python

重要说明和改进方向 :
SUMO 场景配置 :
* 你需要一个 `.sumocfg` 文件,它指向一个路网文件 (`.net.xml`) 和一个车流文件 (`.rou.xml`)。
* 路网文件中必须定义一个交通信号灯,其 ID 与代码中的 `TLS_ID` (例如 `"J1"`) 匹配。
* 信号灯的相位定义 (`<tlLogic>`) 需要与代码中的 `PHASE_NS_GREEN_INDEX` 和 `PHASE_EW_GREEN_INDEX` 以及 `ACTIONS` 列表正确对应。例如,如果 `ACTIONS = [0, 2]`,那么在 `traci.trafficlight.setPhase(TLS_ID, 0)` 时,SUMO 应该激活南北绿灯;`setPhase(TLS_ID, 2)` 时激活东西绿灯。
* 你需要定义检测器 (如 `e1Detector` 或 `e2Detector`/`laneAreaDetector`) 来准确获取排队长度或车流量。代码中 `get_lane_queue_length` 使用 `getLastStepHaltingNumber` 是一个简化,实际中检测器数据更可靠。`INCOMING_LANES_DETECTORS` 应该映射到这些检测器的ID。
状态表示 (get_state):
* 代码中 `get_state()` 函数对于 `current_phase_logic_index` 的处理是一个难点,因为它依赖于对 SUMO 实际相位到抽象逻辑相位的正确映射。使用全局变量 `last_selected_logic_phase_index` 是一种简化处理。
* 更鲁棒的方法是仔细解析 `traci.trafficlight.getCompleteRedYellowGreenDefinition(TLS_ID)[0].phases[current_sumo_phase].state` (灯色字符串) 来判断当前哪个方向是绿灯,或者将历史动作序列作为状态的一部分。
* 排队长度的离散化阈值 (`MAX_CARS_PER_LANE_STATE` 和 `discretize_queue` 函数) 需要根据你的路网和流量进行调整。
奖励函数 (get_reward):
* 当前奖励是负的总排队长度。可以尝试其他奖励函数,如负的累积等待时间 (`traci.lane.getWaitingTime(lane_id)`) 的总和,或者通过吞吐量。
动作执行与持续时间 (ACTION_DURATION):
* `ACTION_DURATION` 定义了每个选定相位至少持续多长时间。这个值需要合理设置,太短会导致频繁切换,太长则响应不及时。通常信号灯有最小绿灯时间和最大绿灯时间。
黄灯和全红 :
* 当前代码直接切换到目标绿灯相位。SUMO 会自动处理两个绿灯相位之间的黄灯和可能的红灯过渡(如果它们在 `tlLogic` 中定义了)。确保你的 `tlLogic` 中包含黄灯相位。`setPhase` 命令会直接跳转到指定索引的相位。
Q 表大小 :
* 如果状态空间非常大 (例如,很多进口道,或者排队长度离散化等级很多),Q 表会变得非常巨大,导致学习缓慢且内存消耗高。这时就需要 DQN 等方法。
仿真文件生成 :
* 代码中注释掉的部分展示了如何用 Python 脚本生成最简单的 `.net.xml`, `.rou.xml`, 和 `.sumocfg` 文件。你需要根据自己的需求调整这些文件,特别是 `tlLogic` 部分,使其与 `PHASE_NS_GREEN_INDEX` 和 `PHASE_EW_GREEN_INDEX` 的值对应。**请务必仔细检查和修改`tlLogic` 中的 `state` 字符串,以确保它正确控制了你期望的交通流向。**
全局变量last_selected_logic_phase_index:
* 这是一个为了简化 `get_state` 函数中确定当前逻辑相位而引入的全局变量。在更复杂的或面向对象的实现中,这个状态可以作为智能体内部状态的一部分进行管理。
代码 :
* 将 `sumocfg_file_path` 修改为你的 SUMO 配置文件。
* 确保 SUMO 相关的环境变量 (如 `SUMO_HOME`) 已设置。
* 安装 `numpy` (以及可选的 `matplotlib` 进行绘图)。
这个 Q-learning 示例提供了一个基础框架。实际应用中,你需要根据具体的交叉口几何形状、相位设置、交通需求等进行大量调整和优化。
第五部分:集成强化学习算法进行交通优化 (续)
5.5 从 Q-learning 到深度Q网络 (DQN)
在上一节中,我们讨论了基于 Q-learning 的自适应交通信号控制。Q-learning 的核心是维护一个 Q 表,Q[state][action],存储每个状态-动作对的价值。然而,这种方法在面临复杂交通场景时会遇到以下主要挑战:
维度灾难 (Curse of Dimensionality) :
* 真实的交通状态通常是高维的。例如,如果我们不仅仅考虑几个离散化的排队长度等级,而是想用更精细的车辆位置、速度、加速度、不同车道类型(直行、左转、右转)的排队情况、甚至周边路口的状况作为状态,那么状态空间会急剧膨胀。
* 如果状态是连续的(例如,精确的平均速度,或未离散化的排队长度),Q 表方法就无法直接应用,除非进行粗略的离散化,但这会损失信息。
* 一个包含 (N_s) 个状态和 (N_a) 个动作的 Q 表需要 (N_s \times N_a) 个条目。当 (N_s) 变得非常大时,存储和更新 Q 表都变得不可行。
泛化能力弱 (Poor Generalization) :
* Q 表为每个遇到的状态-动作对独立学习 Q 值。它无法将从一个状态学到的经验泛化到相似但未曾见过的状态。这意味着智能体需要经历大量的状态才能学习到一个好的策略。
为了克服这些问题,深度学习,特别是神经网络,被引入到强化学习中,催生了深度强化学习。DQN 就是其中的一个里程碑式算法。
DQN 的核心思想 : 使用深度神经网络来近似动作价值函数 Q(s, a)。
即,我们不再用一个表格来存储 Q 值,而是训练一个函数逼近器 (function approximator) (Q(s, a; \theta)),其中 (\theta) 是神经网络的参数(权重和偏置)。这个网络通常接收状态 (s) 作为输入,输出对应每个可能动作 (a) 的 Q 值。
5.6 深度Q网络 (DQN) 的关键组成
DQN 算法的成功主要归功于以下几个关键技术:
使用神经网络作为Q函数逼近器 (Neural Network as Q-function Approximator) :
* **输入** : 状态 (s)。状态的表示方式对于神经网络的性能至关重要。它可以是一个特征向量(例如,各车道排队长度、平均速度、当前信号相位等拼接而成),或者是一个类似图像的表示(例如,将交叉口区域网格化,每个网格单元表示车辆密度或存在性)。
* **输出** : 对于离散动作空间,网络通常输出一个向量,向量的每个元素对应一个动作的 Q 值。例如,如果交通信号灯有4个可能的相位切换动作,网络就输出一个4维向量 ([Q(s, a_1), Q(s, a_2), Q(s, a_3), Q(s, a_4)])。
* **架构** : 可以是全连接的多层感知机 (MLP),或者当状态具有空间结构时(如图像化表示),可以使用卷积神经网络 (CNN)。
经验回放 (Experience Replay) :
* 在与环境交互的过程中,智能体会产生一系列经验元组 (experience tuples): ((s_t, a_t, r_t, s_{t+1})),分别表示在时间 (t) 的状态、采取的动作、获得的奖励以及转移到的下一个状态。
* 直接使用连续产生的样本进行训练(在线学习)存在两个问题:
* **样本相关性高** : 连续样本之间通常高度相关,这违反了许多监督学习算法关于样本独立同分布 (i.i.d.) 的假设,可能导致训练不稳定或收敛到局部最优。
* **数据效率低** : 每个经验元组只被使用一次就被丢弃,没有充分利用。
* **经验回放机制** :
* 将智能体产生的经验元组存储在一个固定大小的缓冲区(称为回放缓冲区或经验池,Replay Buffer / Memory)。
* 在训练网络时,从回放缓冲区中随机采样一个小批量 (mini-batch) 的经验元组进行训练。
* **优点** :
* **打破样本相关性** : 随机采样使得训练数据更接近独立同分布,提高了训练的稳定性。
* **提高数据利用率** : 每个经验元组可以被多次用于训练,提高了学习效率。
* **避免遗忘** : 对于稀有但重要的经验,经验回放增加了它们被重新学习的机会。
目标网络 (Target Network) :
* 在 Q-learning 的更新规则中,TD 目标 (TD Target) 是 (y_t = r_t + \gamma \max_{a’} Q(s_{t+1}, a’; \theta))。
* 如果使用同一个网络 (\theta) 来计算当前 Q 值 (Q(s_t, a_t; \theta)) 和目标 Q 值中的 (\max_{a’} Q(s_{t+1}, a’; \theta)),会导致目标值与正在更新的参数 (\theta) 相关联。当 (\theta) 更新时,目标值也会立即变化,这可能导致训练过程的振荡和不稳定。
* **目标网络机制** :
* 使用两个结构相同但参数不同的神经网络:
* **在线网络 (Online Network / Policy Network)** : (Q(s, a; \theta)),用于选择动作和在训练时计算梯度。这个网络是实时更新的。
* **目标网络 (Target Network)** : (Q(s, a; \theta^-)),用于计算 TD 目标中的下一状态的最大 Q 值。目标网络的参数 (\theta^-) 会定期(例如每隔 C 步)从在线网络的参数 (\theta) 复制而来,或者通过软更新 (soft update) 缓慢地跟踪在线网络参数。
* TD 目标变为 (y_t = r_t + \gamma \max_{a’} Q(s_{t+1}, a’; \theta^-))。
* **优点** :
* **稳定训练** : 通过固定目标网络参数一段时间,使得 TD 目标相对稳定,减少了自举 (bootstrapping) 过程中的振荡,使得学习过程更加平滑和稳定。
DQN 的损失函数 :
DQN 通常使用均方误差 (Mean Squared Error, MSE) 作为损失函数,来最小化预测 Q 值和目标 Q 值之间的差异:
[ L(\theta) = E_{(s,a,r,s’) \sim U(D)} \left[ \left( r + \gamma \max_{a’} Q(s’, a’; \theta^-) - Q(s, a; \theta) \right)^2 \right] ]
其中 (D) 是经验回放缓冲区,((s,a,r,s’) \sim U(D)) 表示从缓冲区中均匀采样一个经验元组。
5.7 在 SUMO 中使用 DQN 进行交通信号控制
现在我们具体讨论如何将 DQN 应用于 SUMO 中的交通信号控制。
5.7.1 状态表示 (State Representation)
这是将 DQN 应用于特定问题的关键步骤。状态表示需要为神经网络提供足够的信息,同时也要考虑计算效率和网络的可学习性。
基于特征向量 (Feature Vector-based) :
* 这是较为常见和直接的方法。将交叉口的各种交通参数数值化并拼接成一个向量。
* **可能的特征** :
* **各进口道的排队车辆数** : `traci.lanearea.getJamLengthVehicle()` (对于 e2 检测器) 或 `traci.edge.getLastStepHaltingNumber()`。
* **各进口道的排队长度 (米)** : `traci.lanearea.getJamLengthMeter()`.
* **各进口道的车辆总数** : `traci.lane.getLastStepVehicleNumber()` 或 `traci.edge.getLastStepVehicleNumber()`.
* **各进口道的平均速度** : `traci.lane.getLastStepMeanSpeed()` 或 `traci.edge.getLastStepMeanSpeed()`.
* **当前信号灯相位** : 可以是当前相位的索引 (one-hot 编码或直接作为数值特征),或者是当前相位已持续的时间。
* **时间信息** : 一天中的时刻 (可能影响交通模式)。
* **检测器占用率** : `traci.inductionloop.getTimeSinceDetection()` 或 `traci.lanearea.getLastStepOccupancy()`.
* **归一化** : 神经网络对输入的尺度敏感,因此通常需要对这些原始特征进行归一化 (例如,除以一个预估的最大值,或者使用 min-max scaling, z-score normalization)。
* **示例状态向量** : `[q_N, q_S, q_E, q_W, phase_idx, phase_duration, speed_N, speed_S, speed_E, speed_W, ...]`
基于网格/图像 (Grid/Image-based) :
* 将交叉口及其附近区域划分为一个二维网格。每个网格单元可以表示该区域的某些属性,形成一个或多个通道的 “图像”。
* **通道可以包括** :
* **车辆存在性/密度图** : 网格单元中是否有车,或车辆数量。
* **车辆速度图** : 网格单元中车辆的平均速度。
* **排队图** : 标记出正在排队的车辆所在的网格。
* 这种表示方法天然适合使用卷积神经网络 (CNN) 来提取空间特征。
* **优点** : 可以捕捉到更精细的空间信息和车辆分布模式。
* **挑战** : 实现复杂度较高,需要将 SUMO 中的车辆坐标映射到网格;计算量也可能更大。
混合表示 : 结合特征向量和图像化表示。
对于初次尝试,基于特征向量的方法通常更容易实现。
5.7.2 动作空间 (Action Space)
与 Q-learning 类似,动作空间通常是离散的。
- 选择下一个信号相位 : 例如,对于一个四相位控制的交叉口,动作可以是选择四个相位中的一个。
- 保持或切换 : 对于只有两个主要流向的交叉口,动作可以是“保持当前相位”或“切换到另一个主要相位”。
- 如果当前是 NS 绿,动作0=保持NS绿,动作1=切换到EW绿。
- 如果当前是 EW 绿,动作0=保持EW绿,动作1=切换到NS绿。
这种定义需要注意状态中包含当前相位信息。
5.7.3 奖励函数 (Reward Function)
奖励函数的设计对 DQN 的性能至关重要。目标是引导智能体学习到期望的交通优化行为。
常用的奖励信号 :
* **负的累积车辆等待时间 (Total Waiting Time)** : `reward = -sum(traci.lane.getWaitingTime(lane_id) for lane_id in all_approach_lanes)` (需要定期获取差值,即 `prev_wait_time - current_wait_time`)。
* **负的交叉口总排队长度 (Total Queue Length)** : `reward = -sum(traci.lanearea.getJamLengthVehicle(det_id) for det_id in all_detectors)`。
* **负的交叉口总延误 (Total Delay)** : SUMO 本身可以输出行程时间信息,延误可以基于此计算。`traci.vehicle.getAccumulatedWaitingTime()`。
* **交叉口吞吐量 (Throughput)** : `reward = number_of_vehicles_passed_intersection_in_last_step`。
* **压力 (Pressure)** : 基于交叉口不同转向的“压力”概念,通常定义为入流量和出流量(或排队)之间的差异。
* **避免频繁切换的惩罚** : 可以在奖励中加入一个小的负值,如果智能体选择了与上一时刻不同的相位。
奖励塑形 (Reward Shaping) : 有时,稀疏的奖励(例如,只有在一轮仿真结束时才给出总奖励)可能使学习非常困难。奖励塑形是指设计更频繁、更具指导性的中间奖励,但这需要小心,以避免引入意外的偏见。
5.7.4 网络架构 (Network Architecture)
对于特征向量输入 : 通常使用几层全连接层 (Dense/Linear layers) 配合 ReLU 或其他激活函数。
* 输入层神经元数量 = 状态向量的维度。
* 隐藏层可以有1到3层,每层神经元数量可以是 (例如) 64, 128, 256 等。
* 输出层神经元数量 = 动作空间的维度,通常不加激活函数(因为输出的是Q值,可以是任意实数)。
Input (state_dim) -> Dense(128) -> ReLU -> Dense(128) -> ReLU -> Dense(num_actions) -> Output (Q-values)
对于图像输入 : 通常使用卷积层 (Convolutional Layers) 来提取空间特征,然后接全连接层。
* `Input (height, width, channels) -> Conv2D -> ReLU -> Pool -> Conv2D -> ReLU -> Pool -> Flatten -> Dense -> ReLU -> Dense(num_actions) -> Output`
5.7.5 Python + TraCI + PyTorch/TensorFlow 实现 DQN (概念性代码)
我们将使用 PyTorch 来实现 DQN。TensorFlow 的实现思路类似。
首先,定义 DQN 智能体所依赖的组件:
- ReplayBuffer (经验回放缓冲区)
import random
from collections import deque, namedtuple
import torch # 导入PyTorch库
# 定义经验元组的结构
Experience = namedtuple('Experience',
('state', 'action', 'next_state', 'reward', 'done')) # 定义一个名为Experience的具名元组,包含状态、动作、下一状态、奖励和完成标志
class ReplayBuffer:
def __init__(self, capacity):
"""
初始化经验回放缓冲区。
:param capacity: int, 缓冲区的最大容量。
"""
self.memory = deque([], maxlen=capacity) # 使用双端队列作为内存,设置最大长度
def push(self, *args):
"""
将一个经验元组存入缓冲区。
*args: state, action, next_state, reward, done
"""
self.memory.append(Experience(*args)) # 将传入的参数包装成Experience对象并添加到内存中
def sample(self, batch_size):
"""
从缓冲区中随机采样一个小批量的经验。
:param batch_size: int, 采样批次的大小。
:return: list of Experience tuples.
"""
return random.sample(self.memory, batch_size) # 从内存中随机抽取指定数量的样本
def __len__(self):
"""返回当前缓冲区中的经验数量。"""
return len(self.memory) # 返回内存中元素的数量
python

- DQN Network (Q 网络模型)
假设我们的状态是特征向量。
import torch.nn as nn # 导入PyTorch神经网络模块
import torch.nn.functional as F # 导入PyTorch神经网络函数模块
class DQNNet(nn.Module):
def __init__(self, state_dim, num_actions, hidden_dim=128):
"""
初始化DQN网络。
:param state_dim: int, 输入状态的维度。
:param num_actions: int, 输出动作的数量。
:param hidden_dim: int, 隐藏层的神经元数量。
"""
super(DQNNet, self).__init__() # 调用父类的构造函数
self.fc1 = nn.Linear(state_dim, hidden_dim) # 定义第一个全连接层,输入维度为state_dim,输出维度为hidden_dim
self.fc2 = nn.Linear(hidden_dim, hidden_dim) # 定义第二个全连接层,输入维度为hidden_dim,输出维度为hidden_dim
self.fc3 = nn.Linear(hidden_dim, num_actions) # 定义第三个全连接层(输出层),输入维度为hidden_dim,输出维度为num_actions
def forward(self, x):
"""
定义网络的前向传播。
:param x: tensor, 输入的状态。
:return: tensor, 每个动作的Q值。
"""
x = F.relu(self.fc1(x)) # 通过第一个全连接层,然后应用ReLU激活函数
x = F.relu(self.fc2(x)) # 通过第二个全连接层,然后应用ReLU激活函数
return self.fc3(x) # 通过输出层,得到Q值
python

- DQNAgent (DQN 智能体)
import torch.optim as optim # 导入PyTorch优化器模块
class DQNAgent:
def __init__(self, state_dim, num_actions, replay_buffer_capacity=10000,
batch_size=128, gamma=0.99, learning_rate=1e-4,
target_update_freq=100, epsilon_start=1.0,
epsilon_end=0.05, epsilon_decay=0.995, device="cpu"):
"""
初始化DQN智能体。
:param state_dim: int, 状态维度。
:param num_actions: int, 动作数量。
:param replay_buffer_capacity: int, 经验回放区容量。
:param batch_size: int, 训练批次大小。
:param gamma: float, 折扣因子。
:param learning_rate: float, 学习率。
:param target_update_freq: int, 目标网络更新频率 (步数)。
:param epsilon_start: float, 初始探索率。
:param epsilon_end: float, 最终探索率。
:param epsilon_decay: float, 探索率衰减因子。
:param device: str, "cpu" or "cuda"。
"""
self.state_dim = state_dim # 保存状态维度
self.num_actions = num_actions # 保存动作数量
self.batch_size = batch_size # 保存批次大小
self.gamma = gamma # 保存折扣因子
self.epsilon = epsilon_start # 初始化当前探索率
self.epsilon_start = epsilon_start # 保存初始探索率
self.epsilon_end = epsilon_end # 保存最终探索率
self.epsilon_decay = epsilon_decay # 保存探索率衰减因子
self.target_update_freq = target_update_freq # 保存目标网络更新频率
self.learn_step_counter = 0 # 初始化学习步骤计数器,用于目标网络更新
self.device = torch.device(device if torch.cuda.is_available() else "cpu") # 设置设备 (GPU或CPU)
print(f"Using device: {self.device}") # 打印使用的设备
self.policy_net = DQNNet(state_dim, num_actions).to(self.device) # 创建策略网络并移动到指定设备
self.target_net = DQNNet(state_dim, num_actions).to(self.device) # 创建目标网络并移动到指定设备
self.target_net.load_state_dict(self.policy_net.state_dict()) # 将策略网络的参数复制到目标网络
self.target_net.eval() # 设置目标网络为评估模式 (不进行梯度计算和参数更新)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate) # 定义Adam优化器,用于更新策略网络的参数
self.replay_buffer = ReplayBuffer(replay_buffer_capacity) # 创建经验回放缓冲区
def select_action(self, state):
"""
根据当前状态和epsilon-greedy策略选择动作。
:param state: numpy array or list, 当前状态。
:return: int, 选择的动作索引。
"""
if random.random() < self.epsilon: # 如果随机数小于epsilon (探索)
return random.randrange(self.num_actions) # 随机选择一个动作
else: # 否则 (利用)
with torch.no_grad(): # 不计算梯度,因为只是进行前向传播获取Q值
# 将状态转换为PyTorch张量,并增加一个批次维度 (unsqueeze(0))
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
q_values = self.policy_net(state_tensor) # 通过策略网络计算Q值
return q_values.argmax().item() # 返回Q值最大的动作的索引
def store_transition(self, state, action, next_state, reward, done):
"""将经验元组存入回放缓冲区。"""
# 将各个部分转换为PyTorch张量,方便后续处理
state_tensor = torch.FloatTensor([state]).to(self.device) # 状态张量
action_tensor = torch.LongTensor([[action]]).to(self.device) # 动作张量 (需要是LongTensor用于gather)
# next_state可能为None (如果done=True),需要处理
if next_state is not None: # 如果下一状态存在
next_state_tensor = torch.FloatTensor([next_state]).to(self.device) # 下一状态张量
else: # 如果下一状态不存在 (episode结束)
next_state_tensor = None # 设置为None
reward_tensor = torch.FloatTensor([[reward]]).to(self.device) # 奖励张量
done_tensor = torch.FloatTensor([[done]]).to(self.device) # 完成标志张量 (0或1)
self.replay_buffer.push(state_tensor, action_tensor, next_state_tensor, reward_tensor, done_tensor) # 将张量化的经验存入缓冲区
def update_epsilon(self):
"""更新探索率epsilon。"""
self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay) # 衰减epsilon,但不低于最小值
def learn(self):
"""从回放缓冲区采样并训练策略网络。"""
if len(self.replay_buffer) < self.batch_size: # 如果缓冲区中的样本数量不足一个批次
return # 不进行学习
experiences = self.replay_buffer.sample(self.batch_size) # 从缓冲区采样一个批次的经验
# 将Experience元组列表转换为Experience的批次 (即每个字段都是一个包含batch_size个元素的张量)
batch = Experience(*zip(*experiences))
# 从批次中解构出 state, action, reward, next_state, done
# torch.cat 会将元组中的张量连接起来
state_batch = torch.cat(batch.state) # 状态批次
action_batch = torch.cat(batch.action) # 动作批次
reward_batch = torch.cat(batch.reward) # 奖励批次
# 处理 next_state_batch,因为有些可能是 None
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)),
device=self.device, dtype=torch.bool) # 创建一个布尔掩码,标记哪些下一状态不是None
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) # 连接所有非None的下一状态
# 计算当前状态下,实际采取动作的Q值: Q(s_t, a_t)
# policy_net(state_batch) 输出所有动作的Q值,shape: [batch_size, num_actions]
# action_batch shape: [batch_size, 1],包含每个样本实际采取的动作索引
# gather(1, action_batch) 会根据 action_batch 中的索引,在第1维度上选取Q值
current_q_values = self.policy_net(state_batch).gather(1, action_batch) # 获取当前状态-动作对的Q值
# 计算下一状态的最大Q值: max_{a'} Q_target(s_{t+1}, a')
# 初始化下一状态的Q值为全零张量
next_state_q_values = torch.zeros(self.batch_size, device=self.device)
if non_final_next_states.size(0) > 0: # 如果存在非终止的下一状态
# 只对非终止的下一状态计算目标Q值
with torch.no_grad(): # 不计算梯度,因为使用的是目标网络
next_state_q_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0].detach() # 计算下一状态的最大Q值,并分离计算图
# .max(1)[0] 返回沿维度1的最大值 (Q值),忽略索引
# 计算期望Q值 (TD Target): r_t + gamma * max_{a'} Q_target(s_{t+1}, a')
# 对于终止状态 (done=True),期望Q值就是即时奖励 r_t
# batch.done 是一个元组,包含FloatTensor,我们需要将其转换为合适的格式
done_batch_float = torch.cat(batch.done) # 完成标志批次
expected_q_values = reward_batch + (self.gamma * next_state_q_values.unsqueeze(1) * (1 - done_batch_float)) # 计算期望Q值
# (1 - done_batch_float) 确保如果 done=1,则gamma项为0
# 计算损失函数 (例如 Huber loss 或 MSE loss)
loss = F.smooth_l1_loss(current_q_values, expected_q_values) # 使用Smooth L1损失 (Huber loss)
# loss = F.mse_loss(current_q_values, expected_q_values) # 或者使用均方误差损失
# 优化模型
self.optimizer.zero_grad() # 清除之前的梯度
loss.backward() # 反向传播计算梯度
# torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100) # (可选) 梯度裁剪,防止梯度爆炸
self.optimizer.step() # 更新策略网络的参数
self.learn_step_counter += 1 # 学习步骤计数器加1
if self.learn_step_counter % self.target_update_freq == 0: # 如果达到目标网络更新频率
self.target_net.load_state_dict(self.policy_net.state_dict()) # 更新目标网络参数
# print("Target network updated.") # 打印目标网络更新信息
python

5.7.6 主训练循环 (SUMO + DQN Agent)
这个循环的结构与 Q-learning 的类似,但状态获取、动作选择和学习步骤将使用 DQN Agent。
import traci
import sumolib
import numpy as np
import os
import sys
# --- SUMO 和 TraCI 配置 (与Q-learning部分类似) ---
if 'SUMO_HOME' in os.environ: # 检查SUMO_HOME环境变量是否存在
tools = os.path.join(os.environ['SUMO_HOME'], 'tools') # 获取tools目录路径
sys.path.append(tools) # 将tools目录添加到系统路径
else:
sys.exit("please declare environment variable 'SUMO_HOME'") # 如果未设置SUMO_HOME则退出
def get_sumo_executable(gui=False): # 定义获取SUMO可执行文件名的函数
if gui: # 如果需要GUI界面
return "sumo-gui" # 返回sumo-gui
return "sumo" # 否则返回sumo
def start_simulation(sumocfg_file, gui=False, port=8813): # 定义启动SUMO仿真的函数
sumo_cmd = [get_sumo_executable(gui), "-c", sumocfg_file] # 构建SUMO启动命令列表
sumo_cmd.extend(["--step-length", "1"]) # 设置仿真步长为1秒
sumo_cmd.extend(["--remote-port", str(port)]) # 设置TraCI远程端口
sumo_cmd.extend(["--waiting-time-memory", "1000"]) # 设置车辆等待时间内存
# sumo_cmd.extend(["--time-to-teleport", "-1"]) # (可选) 禁止车辆瞬移
# sumo_cmd.extend(["--no-warnings", "true"]) # (可选) 禁止SUMO警告输出
traci.start(sumo_cmd, port=port) # 启动SUMO并连接TraCI
# print(f"SUMO started on port {port} with command: {' '.join(sumo_cmd)}") # 打印SUMO启动信息
# --- 交通场景特定参数 ---
TLS_ID = "J1" # 交通灯ID
# 假设动作: 0 = 保持当前相位逻辑, 1 = 切换到下一个预定义相位逻辑
# 或者更直接: 0 = 激活NS绿灯, 1 = 激活EW绿灯
# 这个动作定义需要与信号灯的相位程序紧密配合
# 为了简化,我们假设有两个主要相位逻辑,动作是选择其中一个
PHASE_LOGIC_NS_GREEN_SUMO_INDEX = 0 # SUMO中代表南北绿的相位索引
PHASE_LOGIC_EW_GREEN_SUMO_INDEX = 2 # SUMO中代表东西绿的相位索引
# (这些索引必须与你的 .net.xml 文件中 tlLogic 的相位定义一致)
POSSIBLE_ACTIONS = [PHASE_LOGIC_NS_GREEN_SUMO_INDEX, PHASE_LOGIC_EW_GREEN_SUMO_INDEX] # 可选的SUMO相位索引
NUM_ACTIONS = len(POSSIBLE_ACTIONS) # 动作数量
ACTION_DURATION_STEPS = 10 # 每个选定动作(相位)至少持续的仿真步数
MIN_GREEN_TIME = 10 # 最小绿灯时间 (与ACTION_DURATION_STEPS类似或结合使用)
# YELLOW_PHASE_DURATION = 3 # 黄灯持续时间 (如果需要手动控制黄灯)
# --- 状态定义 ---
# 示例:[q_N, q_S, q_E, q_W, current_phase_logic_idx (0 for NS, 1 for EW)]
# q_i 是归一化后的排队长度或车辆数
# INCOMING_LANES_DETECTORS 应该定义好,例如使用e2 LaneAreaDetectors
# "det_N_entry", "det_S_entry", "det_E_entry", "det_W_entry"
INCOMING_LANE_AREA_DETECTORS = ["e2det_N", "e2det_S", "e2det_E", "e2det_W"] # 假设的e2检测器ID
# 需要知道每个检测器对应的最大车辆数或长度,用于归一化
MAX_QUEUE_PER_DETECTOR = 50 # 假设每个检测器区域最大容纳50辆车 (用于归一化)
STATE_DIM = len(INCOMING_LANE_AREA_DETECTORS) + 1 # 状态维度 = 检测器数量 + 当前相位逻辑索引
# 全局变量,追踪当前相位逻辑 (0 for NS, 1 for EW)
current_phase_logic = 0 # 初始假设为NS绿灯逻辑
def get_normalized_queue(detector_id): # 定义获取归一化队列长度的函数
try:
# 使用 e2Detector (LaneAreaDetector) 的 getJamLengthVehicle
num_vehicles = traci.lanearea.getLastStepVehicleNumber(detector_id) # 获取检测区域内的车辆数
return min(num_vehicles / MAX_QUEUE_PER_DETECTOR, 1.0) # 归一化并确保不超过1.0
except traci.TraCIException: # 捕获TraCI异常
# print(f"Warning: TraCIException while getting queue for {detector_id}") # 打印警告
return 0.0 # 异常时返回0
def get_current_state(): # 定义获取当前状态的函数
global current_phase_logic # 使用全局变量 current_phase_logic
state_features = [] # 初始化状态特征列表
for det_id in INCOMING_LANE_AREA_DETECTORS: # 遍历所有检测器ID
state_features.append(get_normalized_queue(det_id)) # 添加归一化的队列长度到特征列表
state_features.append(current_phase_logic) # 添加当前相位逻辑 (0或1)
return np.array(state_features, dtype=np.float32) # 返回numpy数组形式的状态特征
def get_reward(): # 定义获取奖励的函数
# 示例奖励:负的交叉口总等待车辆数 (简化)
# 更复杂的奖励可以考虑总等待时间、平均速度、吞吐量等
total_halting_vehicles = 0 # 初始化总停止车辆数
for det_id in INCOMING_LANE_AREA_DETECTORS: # 遍历所有检测器ID
try:
# 如果用 getLastStepHaltingNumber,它返回的是边上的,或者需要为每个laneArea定义一个edge
# 这里我们用车辆数代替,简单起见
total_halting_vehicles += traci.lanearea.getLastStepHaltingNumber(det_id) # 累加检测区域内的停止车辆数
except traci.TraCIException: # 捕获TraCI异常
pass # 忽略异常
# 奖励可以是排队长度的减少量,或者直接是负的排队长度
return -total_halting_vehicles # 返回负的总停止车辆数
def run_one_episode(episode_num, agent, sumocfg_file, gui=False, max_steps_per_episode=3600, port=8813): # 定义单个训练轮次的函数
global current_phase_logic # 使用全局变量 current_phase_logic
start_simulation(sumocfg_file, gui=gui, port=port) # 启动SUMO仿真
# 初始化,设置一个初始相位
current_phase_logic = 0 # 假设初始是NS绿灯逻辑
traci.trafficlight.setPhase(TLS_ID, POSSIBLE_ACTIONS[current_phase_logic]) # 设置初始SUMO相位
# 几步让初始相位生效并获取初始状态
for _ in range(ACTION_DURATION_STEPS // 2): # 相位持续时间的一半
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否意外结束
print(f"SUMO closed prematurely at init step in episode {episode_num}.") # 打印提前结束信息
traci.close() # 关闭TraCI连接
return 0, 0 # 返回0奖励和0步数
traci.simulationStep() # 执行一步仿真
state = get_current_state() # 获取初始状态
total_reward_episode = 0 # 初始化本轮总奖励
current_action_start_time = 0 # 当前动作开始的仿真时间
for step in range(max_steps_per_episode): # 在每轮的最大步数内循环
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否意外结束
print(f"SUMO closed prematurely at step {step} in episode {episode_num}.") # 打印提前结束信息
break # 跳出循环
# 智能体决策的频率:每当一个动作(相位)持续了ACTION_DURATION_STEPS
if (traci.simulation.getTime() - current_action_start_time) >= ACTION_DURATION_STEPS: # 如果当前动作已达到持续时间
action_idx = agent.select_action(state) # 智能体选择动作 (0 或 1)
chosen_sumo_phase = POSSIBLE_ACTIONS[action_idx] # 将智能体动作映射到SUMO相位
# 更新当前相位逻辑 (如果动作是选择逻辑,而不是保持/切换)
current_phase_logic = action_idx # 更新全局相位逻辑变量
# 应用动作到SUMO
traci.trafficlight.setPhase(TLS_ID, chosen_sumo_phase) # 设置交通灯相位
current_action_start_time = traci.simulation.getTime() # 更新动作开始时间
# 在这里,我们让这个相位至少持续ACTION_DURATION_STEPS
# 所以奖励和下一状态的获取应该在这个持续期之后(或者在下一个决策点之前)
# 为了简化,我们在这里让它执行一步,然后在循环顶部检查是否该做新决策
traci.simulationStep() # 执行一步仿真
# 仅在决策点(即一个动作持续期结束后,新动作选定前)收集经验
# 或者,我们可以每一步都收集,但这样状态可能不会有太大变化
# 更常见的做法是,动作执行后,环境演化N步,然后获取(s', r)
# 这里我们简化为在下一个决策点之前获取。
# 为了训练,我们需要 (s, a, r, s', done)
# (s, a) 是上一个决策点的,r, s' 是当前这个决策点的
# 为了简化,我们假设每个ACTION_DURATION_STEPS后进行一次学习
# 这意味着 (state, action_idx) 是ACTION_DURATION_STEPS之前的
# reward 和 next_state 是在这ACTION_DURATION_STEPS之后得到的
if (traci.simulation.getTime() - current_action_start_time) < ACTION_DURATION_STEPS and step < max_steps_per_episode -1 : # 如果当前动作还未结束且不是最后一simulation步
# 在一个动作的持续期间内,我们不进行学习,只是累积经验
# 或者,可以在每个仿真步都计算奖励和下一状态,然后存储
# 这样做的话,state是当前步的,action是上个决策点的,reward是当前步的,next_state是下一步的
# 这里我们选择在每个决策周期结束时学习
continue # 继续到下一个仿真步,直到动作持续期结束
# --- 此时,一个动作周期 (ACTION_DURATION_STEPS) 结束了,或者episode结束 ---
next_state = get_current_state() # 获取新的状态
reward = get_reward() # 获取奖励
done = (step == max_steps_per_episode - 1) # 判断是否回合结束
# 这里的 action_idx 应该是导致当前 reward 和 next_state 的那个动作
# 它是在上一个 ACTION_DURATION_STEPS 开始时选的
# 我们需要确保 agent.select_action(state) 返回的 action_idx 被正确记录
# 实际上,我们应该在动作选定后,执行 ACTION_DURATION_STEPS, 然后获取(s', r)
# 然后用 (s, action_idx_prev, r, s', done) 来学习
# 修正逻辑:
# 1. 在决策点,获取当前状态 s
# 2. agent 根据 s 选择动作 a
# 3. 执行动作 a,并让仿真 ACTION_DURATION_STEPS
# 4. 在这 ACTION_DURATION_STEPS 之后,获取新的状态 s' 和奖励 r
# 5. 存储 (s, a, r, s', done)
# 6. s = s'
# 如果我们采用这种结构,那么上面 `if (traci.simulation.getTime() - current_action_start_time) >= ACTION_DURATION_STEPS:` 块
# 应该包含获取 s, 选择 a, 然后循环 ACTION_DURATION_STEPS, 然后获取 s', r, 存储, 学习。
# 为了代码简洁,我们假设 state 是在选择动作之前的状态,
# next_state 和 reward 是在动作执行了 ACTION_DURATION_STEPS 之后的状态和奖励。
# 而 action_idx 就是在 state 时选择的那个动作。
# 重新组织循环以匹配 (s,a,r,s') 结构:
# state = get_current_state() # 循环开始时获取
# for step in range(max_steps_per_episode):
# action_idx = agent.select_action(state)
# chosen_sumo_phase = POSSIBLE_ACTIONS[action_idx]
# current_phase_logic = action_idx
# traci.trafficlight.setPhase(TLS_ID, chosen_sumo_phase)
#
# accumulated_reward_for_action = 0
# for _ in range(ACTION_DURATION_STEPS):
# if traci.simulation.getMinExpectedNumber() <= 0: break
# traci.simulationStep()
# accumulated_reward_for_action += get_reward() # 这里的奖励定义需要调整,可能是每步奖励的均值或总和
# if traci.simulation.getMinExpectedNumber() <= 0: break
#
# next_state = get_current_state()
# reward_for_buffer = accumulated_reward_for_action # (或者使用在ACTION_DURATION_STEPS结束时的瞬时奖励)
# done = (step >= max_steps_per_episode - ACTION_DURATION_STEPS) # 调整done的判断
# agent.store_transition(state, action_idx, next_state, reward_for_buffer, done)
# agent.learn()
# state = next_state
# total_reward_episode += reward_for_buffer
# agent.update_epsilon()
# 当前代码的逻辑 (每ACTION_DURATION_STEPS决策一次,学习一次):
# `state` 是这个决策周期开始时的状态。
# `action_idx` 是基于 `state` 选择的动作。
# `next_state` 和 `reward` 是在执行 `action_idx` 并 `ACTION_DURATION_STEPS` 后的结果。
# 注意:之前的代码 `action_idx` 是在 `if` 块内定义的,其作用域可能导致问题。
# 我们需要将 `action_idx` 的获取移到存储转换之前。
# 让我们重新审视主循环逻辑,使其更清晰:
# (我们将采用在每个动作持续期结束后进行一次转换存储和学习的模式)
# 假设 `state` 是在当前决策点获取的
# `action_idx_for_this_period` 是 agent 根据 `state` 选择的
# 然后模拟 `ACTION_DURATION_STEPS`
# `next_state_after_period` 和 `reward_after_period` 是这个周期结束时得到的
# 存储 (state, action_idx_for_this_period, reward_after_period, next_state_after_period, done)
# **为了让当前代码结构能跑起来,我们假设`action_idx`是在每个周期开始时选取的**
# **并且 `state` 是那个周期的初始状态。`reward` 和 `next_state` 是周期结束时的。**
# 这种情况下,我们需要一个变量来保存上一个周期选择的动作。
# 我们暂时保持之前的简化逻辑,但要注意其局限性。
# (在更完整的实现中,(s,a) -> (r,s') 的对应关系需要非常清晰)
# 为了能够将(s,a,r,s',done)正确地存入buffer,我们需要确保这些变量在正确的时间点被捕获。
# 在当前的每 ACTION_DURATION_STEPS 决策一次的框架下:
# state_at_decision = get_current_state()
# action_chosen = agent.select_action(state_at_decision)
# ... (执行动作,模拟 ACTION_DURATION_STEPS) ...
# reward_received = get_reward() (在周期末尾)
# next_state_at_decision = get_current_state() (在周期末尾)
# agent.store_transition(state_at_decision, action_chosen, next_state_at_decision, reward_received, done)
# 在当前代码中,`state` 是在 `ACTION_DURATION_STEPS` 开始前获得的。
# `action_idx` 是在 `ACTION_DURATION_STEPS` 开始前选择的。
# `next_state` 和 `reward` 是在 `ACTION_DURATION_STEPS` 结束后获得的。
# 这是正确的对应关系,但action_idx需要在if外部能访问到。
# 为了简单演示,我们将采用以下结构:
# 在每个时间步 `t`:
# 1. 如果是决策步 (e.g., t % ACTION_DURATION_STEPS == 0):
# a. 获取当前状态 `s_t`
# b. Agent 选择动作 `a_t` (此动作将在接下来的 ACTION_DURATION_STEPS 内生效)
# c. 如果不是第一步,将上一周期的经验 (s_{t-k}, a_{t-k}, r_t, s_t, done_t) 存入buffer并学习
# (这里的 r_t 是指从 t-k 到 t 这段时间的累积奖励或瞬时奖励)
# 2. 执行仿真步 `traci.simulationStep()`
# 3. `s_t` 变为 `s_{t+1}`
# 让我们坚持每个 ACTION_DURATION_STEPS 存储一次转换并学习。
# `state` 将是上一个决策周期的结束状态 (即当前决策周期的开始状态)。
# `action_idx` 是针对这个 `state` 做出的决策。
# `next_state` 和 `reward` 是执行 `action_idx` 持续 `ACTION_DURATION_STEPS` 后的结果。
# 修正:需要在 store_transition 之前确定 action_idx
# 我们假设在进入这个 "学习" 阶段之前,action_idx 已经被选定并执行了一段时间。
# 这意味着,我们需要一个变量来保存上一个决策周期选择的动作。
# 假设 `action_idx_for_period` 是当前周期正在执行的动作。
# 一个更清晰的循环结构:
# state = get_current_state()
# for step in range(total_simulation_duration):
# if step % ACTION_DURATION_STEPS == 0:
# current_action_idx = agent.select_action(state)
# # 应用动作 current_action_idx
# current_phase_logic = current_action_idx
# traci.trafficlight.setPhase(TLS_ID, POSSIBLE_ACTIONS[current_phase_logic])
#
# traci.simulationStep()
#
# if (step + 1) % ACTION_DURATION_STEPS == 0: # 一个动作周期结束
# next_s = get_current_state()
# r = get_reward() # 这个奖励是这个周期末尾的瞬时奖励,或者这个周期的累积奖励
# d = (step + 1 >= total_simulation_duration)
# agent.store_transition(state, current_action_idx, next_s, r, d) # state 和 current_action_idx 是周期开始时的
# agent.learn()
# state = next_s # 更新状态
# total_reward_episode += r
# agent.update_epsilon()
# 为了使用当前代码的 `if (traci.simulation.getTime() - current_action_start_time) >= ACTION_DURATION_STEPS:` 结构:
# 我们需要在 `agent.select_action(state)` 之后,但在进入下一个 `ACTION_DURATION_STEPS` 循环之前,
# 将 (旧state, 刚选的action, 新reward, 新state, done) 存起来。
# 这里的 `state` 是旧的,`next_state` 和 `reward` 是新的。
# --- 简化:在每个决策点学习上一个周期的经验 ---
# 假设 `prev_state`, `prev_action_idx` 已经被记录
if step > ACTION_DURATION_STEPS : # 确保有上一个周期的经验可以学习
# `state` 此时是上一个周期结束时的状态 (即 `s_prime_prev`)
# `reward` 是上一个周期结束时计算的奖励 (`r_prev`)
# 我们需要的是 `s_prev`, `a_prev`, `r_prev`, `s_prime_prev` (即当前`state`)
# 这个逻辑需要仔细设计,很容易出错。
# **我们采用标准的RL循环结构来改写 run_one_episode**
if step > 0: # 从第二步开始,因为第一步是初始化
# 假设 action_idx 是上一步选择的,state 是上一步的状态
# next_state 是当前步模拟后的状态,reward 是当前步的奖励
# 这意味着我们需要在循环开始前获取 action_idx
# agent.store_transition(previous_state_for_buffer, action_idx_for_buffer, state, reward, done)
# agent.learn()
# agent.update_epsilon()
pass # 这个位置的学习逻辑比较复杂,先跳过,专注于DQN Agent的调用
total_reward_episode += reward # 累加奖励
# 学习过程应该在循环内部,每次获得一个完整的(s,a,r,s')后
# 这里的简化循环没有清晰地展示学习步骤。
# 真正的学习应该在 agent.store_transition()之后调用 agent.learn()
traci.close() # 关闭TraCI连接
print(f"Episode {episode_num+1}: Total Reward: {total_reward_episode:.2f}, Steps: {step+1}, Epsilon: {agent.epsilon:.3f}") # 打印轮次信息
return total_reward_episode, step + 1 # 返回总奖励和步数
# ---- 更标准的 RL 循环 for run_one_episode ----
def run_one_episode_std_rl(episode_num, agent, sumocfg_file, gui=False, max_steps_per_episode=3600, port=8813): # 定义单个训练轮次的标准RL版本
global current_phase_logic # 使用全局变量 current_phase_logic
start_simulation(sumocfg_file, gui=gui, port=port) # 启动SUMO仿真
current_phase_logic = 0 # 初始假设为NS绿灯逻辑
traci.trafficlight.setPhase(TLS_ID, POSSIBLE_ACTIONS[current_phase_logic]) # 设置初始SUMO相位
# 几步让初始相位生效并获取初始状态
for _ in range(5): # 5步
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否提前结束
traci.close() # 关闭TraCI连接
return 0, 0 # 返回0奖励和0步数
traci.simulationStep() # 执行一步仿真
state = get_current_state() # 获取初始状态
total_reward_episode = 0 # 初始化本轮总奖励
for step_in_episode in range(max_steps_per_episode): # 在每轮最大步数内循环
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否提前结束
print(f"SUMO closed prematurely at step {step_in_episode} in episode {episode_num}.") # 打印提前结束信息
break # 跳出循环
action_idx = agent.select_action(state) # Agent根据当前状态选择动作
# 应用动作到SUMO,并让其持续 ACTION_DURATION_STEPS
chosen_sumo_phase = POSSIBLE_ACTIONS[action_idx] # 获取对应的SUMO相位
current_phase_logic = action_idx # 更新全局相位逻辑
traci.trafficlight.setPhase(TLS_ID, chosen_sumo_phase) # 设置交通灯相位
accumulated_reward_this_action_period = 0 # 初始化本动作周期的累积奖励
for _ in range(ACTION_DURATION_STEPS): # 动作持续期循环
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否提前结束
break # 跳出内部循环
traci.simulationStep() # 执行一步仿真
# 奖励可以是每一步的瞬时奖励之和,或者是周期结束时的状态奖励
# 为了简单,我们使用周期结束时的瞬时奖励
# accumulated_reward_this_action_period += get_reward() # (如果奖励是每步累加)
if traci.simulation.getMinExpectedNumber() <= 0: # 再次检查仿真是否提前结束
break # 跳出主循环
next_state = get_current_state() # 获取动作周期结束后的下一状态
reward = get_reward() # 获取动作周期结束后的奖励 (基于next_state的瞬时奖励)
# reward = accumulated_reward_this_action_period # (如果使用累积奖励)
done = (step_in_episode >= max_steps_per_episode - ACTION_DURATION_STEPS) # 判断是否回合结束
agent.store_transition(state, action_idx, next_state, reward, done) # 存储经验
agent.learn() # Agent学习
state = next_state # 更新状态
total_reward_episode += reward # 累加总奖励
agent.update_epsilon() # 更新探索率
if done: # 如果回合结束
break # 跳出主循环
traci.close() # 关闭TraCI连接
print(f"Episode {episode_num+1}: Total Reward: {total_reward_episode:.2f}, Steps in Ep: {step_in_episode+1}, Epsilon: {agent.epsilon:.3f}") # 打印轮次信息
return total_reward_episode, step_in_episode + 1 # 返回总奖励和步数
# --- 主训练代码 ---
if __name__ == '__main__':
NUM_EPISODES_TRAIN = 500 # 训练的总轮次数
# SUMO 配置文件路径 (你需要创建这个文件和相关的 .net.xml, .rou.xml)
# 确保 .net.xml 中有名为 "J1" 的交通灯,并且其相位程序与POSSIBLE_ACTIONS兼容
# 确保 .net.xml 中定义了 INCOMING_LANE_AREA_DETECTORS 中的 e2 检测器
sumo_config_file = "your_scenario_for_dqn.sumocfg"
# 示例:如何创建最简单的配置文件 (你需要根据你的DQN需求调整)
# network_file_dqn = "dqn_intersection.net.xml"
# route_file_dqn = "dqn_routes.rou.xml"
# config_file_dqn = "your_scenario_for_dqn.sumocfg"
# if not os.path.exists(network_file_dqn):
# with open(network_file_dqn, "w") as f:
# f.write(f"""<network>
# <node id="J1" x="0.0" y="0.0" type="traffic_light"/>
# <node id="N1" x="0.0" y="100.0" type="priority"/> <node id="S1" x="0.0" y="-100.0" type="priority"/>
# <node id="E1" x="100.0" y="0.0" type="priority"/> <node id="W1" x="-100.0" y="0.0" type="priority"/>
# <edge id="edge_N_in" from="N1" to="J1"><lane id="lane_N_in_0" index="0" speed="13.89" length="100.0"/></edge>
# <edge id="edge_S_in" from="S1" to="J1"><lane id="lane_S_in_0" index="0" speed="13.89" length="100.0"/></edge>
# <edge id="edge_E_in" from="E1" to="J1"><lane id="lane_E_in_0" index="0" speed="13.89" length="100.0"/></edge>
# <edge id="edge_W_in" from="W1" to="J1"><lane id="lane_W_in_0" index="0" speed="13.89" length="100.0"/></edge>
# <edge id="edge_out_N" from="J1" to="N1"/> <edge id="edge_out_S" from="J1" to="S1"/>
# <edge id="edge_out_E" from="J1" to="E1"/> <edge id="edge_out_W" from="J1" to="W1"/>
# <tlLogic id="{TLS_ID}" type="static" programID="0" offset="0">
# <phase duration="30" state="GGggrrrrGGgg"/> <!-- NS Green (index 0) -->
# <phase duration="4" state="yyyyrrrryyyy"/> <!-- NS Yellow (index 1) -->
# <phase duration="30" state="rrrrGGggrrrr"/> <!-- EW Green (index 2) -->
# <phase duration="4" state="rrrryyyyrrrr"/> <!-- EW Yellow (index 3) -->
# </tlLogic>
# <!-- Lane Area Detectors (e2) -->
# <additional>
# <laneAreaDetector id="{INCOMING_LANE_AREA_DETECTORS[0]}" lane="lane_N_in_0" pos="-50" length="40" freq="1" file="e2_N.out"/>
# <laneAreaDetector id="{INCOMING_LANE_AREA_DETECTORS[1]}" lane="lane_S_in_0" pos="-50" length="40" freq="1" file="e2_S.out"/>
# <laneAreaDetector id="{INCOMING_LANE_AREA_DETECTORS[2]}" lane="lane_E_in_0" pos="-50" length="40" freq="1" file="e2_E.out"/>
# <laneAreaDetector id="{INCOMING_LANE_AREA_DETECTORS[3]}" lane="lane_W_in_0" pos="-50" length="40" freq="1" file="e2_W.out"/>
# </additional>
# </network>""")
# print(f"Created dummy {network_file_dqn}")
# if not os.path.exists(route_file_dqn):
# with open(route_file_dqn, "w") as f:
# f.write("""<routes>
# <vType id="car" accel="2.6" decel="4.5" sigma="0.5" length="5" maxSpeed="13.89" />
# <flow id="f_N_S" type="car" from="edge_N_in" to="edge_out_S" begin="0" end="3600" number="300"/>
# <flow id="f_S_N" type="car" from="edge_S_in" to="edge_out_N" begin="0" end="3600" number="300"/>
# <flow id="f_E_W" type="car" from="edge_E_in" to="edge_out_W" begin="0" end="3600" number="300"/>
# <flow id="f_W_E" type="car" from="edge_W_in" to="edge_out_E" begin="0" end="3600" number="300"/>
# </routes>""")
# print(f"Created dummy {route_file_dqn}")
# if not os.path.exists(config_file_dqn):
# with open(config_file_dqn, "w") as f:
# f.write(f"""<configuration>
# <input>
# <net-file value="{os.path.basename(network_file_dqn)}"/>
# <route-files value="{os.path.basename(route_file_dqn)}"/>
# <additional-files value="{os.path.basename(network_file_dqn)}"/> <!-- Include network again if detectors are in <additional> inside .net.xml -->
# </input>
# <time><begin value="0"/><end value="4000"/></time>
# <processing><waiting-time-memory value="1000"/></processing>
# </configuration>""")
# print(f"Created dummy {config_file_dqn}")
# sumo_config_file = config_file_dqn
if not os.path.exists(sumo_config_file): # 检查配置文件是否存在
print(f"Error: SUMO configuration file '{sumo_config_file}' not found.") # 打印错误信息
print("Please create the SUMO scenario files (.net.xml with TLS 'J1' and e2 detectors, .rou.xml, .sumocfg).") # 提示用户创建文件
sys.exit(1) # 退出程序
# 创建DQN智能体
dqn_agent = DQNAgent(state_dim=STATE_DIM,
num_actions=NUM_ACTIONS,
replay_buffer_capacity=50000, # 经验回放区容量
batch_size=64, # 批次大小
gamma=0.95, # 折扣因子
learning_rate=0.0005, # 学习率
target_update_freq=200, # 目标网络更新频率 (按学习次数计)
epsilon_start=1.0, # 初始探索率
epsilon_end=0.01, # 最终探索率
epsilon_decay=0.99, # 探索率衰减
device="cuda" if torch.cuda.is_available() else "cpu") # 使用GPU或CPU
episode_rewards = [] # 初始化每轮奖励列表
total_steps_across_episodes = 0 # 初始化总步数
# 训练循环
for i_episode in range(NUM_EPISODES_TRAIN): # 循环指定的训练轮次数
# GUI可以用于调试,但会显著减慢训练速度
show_gui = False # 设置是否显示GUI
# if i_episode % 50 == 0 or i_episode < 2: # 每50轮或前2轮显示GUI
# show_gui = True
# 使用标准RL循环的 run_one_episode_std_rl
# 注意,TraCI端口需要不同,以防多个SUMO实例冲突(如果并行或快速重启)
# 但这里是串行,所以端口可以固定,或每次生成唯一的
reward_ep, steps_ep = run_one_episode_std_rl(i_episode, dqn_agent, sumo_config_file,
gui=show_gui,
max_steps_per_episode=1800, # 每轮最大仿真步数 (例如半小时)
port=8813 + (i_episode % 5)) # 稍微改变端口以避免快速重启时的冲突 (可选)
episode_rewards.append(reward_ep) # 添加本轮奖励到列表
total_steps_across_episodes += steps_ep # 累加总步数
if (i_episode + 1) % 10 == 0: # 每10轮打印一次平均奖励
avg_reward = np.mean(episode_rewards[-10:]) # 计算最近10轮的平均奖励
print(f"--- Episode {i_episode+1}/{NUM_EPISODES_TRAIN} ---") # 打印当前轮次
print(f" Avg Reward (last 10): {avg_reward:.2f}") # 打印平均奖励
print(f" Current Epsilon: {dqn_agent.epsilon:.3f}") # 打印当前探索率
print(f" Replay Buffer Size: {len(dqn_agent.replay_buffer)}") # 打印经验回放区大小
# (可选) 保存模型
# torch.save(dqn_agent.policy_net.state_dict(), f"dqn_policy_net_ep{i_episode+1}.pth")
print("Training finished.") # 打印训练完成信息
# (可选) 绘制奖励曲线
try:
import matplotlib.pyplot as plt # 导入matplotlib库
plt.figure(figsize=(10,5)) # 创建图像,设置大小
plt.plot(episode_rewards) # 绘制奖励曲线
# 计算移动平均奖励,使曲线更平滑
moving_avg = np.convolve(episode_rewards, np.ones(20)/20, mode='valid') # 计算20轮的移动平均
plt.plot(np.arange(len(moving_avg)) + 19, moving_avg, label='Moving Avg (20 episodes)', color='red') # 绘制移动平均曲线
plt.xlabel("Episode") # 设置x轴标签
plt.ylabel("Total Reward per Episode") # 设置y轴标签
plt.title("DQN Training Progress for Traffic Signal Control") # 设置标题
plt.legend() # 显示图例
plt.grid(True) # 显示网格
plt.savefig("dqn_training_rewards.png") # 保存图像
# plt.show() # 显示图像 (如果在服务器上,可能不需要)
print("Reward plot saved to dqn_training_rewards.png") # 打印保存信息
except ImportError: # 捕获导入错误
print("Matplotlib not installed, skipping reward plot.") # 打印跳过绘图信息
python

代码的关键点和说明 (run_one_episode_std_rl 和 main):
SUMO场景与DQN参数 :
* `sumo_config_file`: 指向你的 `.sumocfg` 文件。这个文件应包含路网 (`.net.xml`),其中有ID为 `TLS_ID` (“J1”) 的交通灯,并且其相位程序应与 `POSSIBLE_ACTIONS` 中的SUMO相位索引对应。
* **e2检测器** : 路网中必须定义 `INCOMING_LANE_AREA_DETECTORS` 中指定的 `laneAreaDetector` (e2Detector),用于获取队列信息。确保这些检测器的 `freq` 属性足够高(例如 “1” 或 “60”)以提供频繁的更新。`pos` 和 `length` 属性定义了检测区域。
* **车辆路由** : `.rou.xml` 文件定义了车流。
* **状态维度 (`STATE_DIM`)**: 根据你选择的特征数量(检测器数量 + 其他特征如当前相位逻辑)来确定。
* **动作 (`POSSIBLE_ACTIONS`, `NUM_ACTIONS`)**: 定义智能体可以选择的SUMO相位索引。
* **归一化 (`MAX_QUEUE_PER_DETECTOR`)**: 对于基于特征向量的状态,归一化非常重要。你需要根据你的场景估计一个合理的检测器最大车辆数。
* **`current_phase_logic`** : 全局变量用于在状态中表示当前哪个主要方向是绿灯。这是一种简化,更复杂情况下可以通过解析 `traci.trafficlight.getPhase(TLS_ID)` 和相位定义来确定。
标准强化学习循环 (run_one_episode_std_rl):
* **初始化** : 设置初始相位,让SUMO几步以稳定。
* **主循环** :
* **获取状态 (`state = get_current_state()`)**: 从SUMO获取当前交通状况并构造成状态向量。
* **选择动作 (`action_idx = agent.select_action(state)`)**: DQN智能体使用其策略网络(和ε-greedy)选择一个动作。
* **执行动作** : 将选定的 `action_idx` 映射到SUMO的实际相位索引,并通过 `traci.trafficlight.setPhase()` 应用。然后,让该相位持续 `ACTION_DURATION_STEPS`。
* **获取经验** : 在动作持续期结束后,获取新的状态 `next_state` 和奖励 `reward`。判断是否回合结束 `done`。
* **存储经验 (`agent.store_transition(...)`)**: 将 ((state, action_idx, next_state, reward, done)) 元组存入经验回放缓冲区。
* **学习 (`agent.learn()`)**: DQN智能体从缓冲区采样并更新其策略网络。
* **更新状态和探索率** : `state = next_state`,并调用 `agent.update_epsilon()`。
* **奖励计算 (`get_reward()`)**: 当前示例是负的停止车辆数。你可以根据优化目标设计更复杂的奖励。
超参数 :
* `replay_buffer_capacity`, `batch_size`, `gamma`, `learning_rate`, `target_update_freq`, `epsilon` 相关参数对DQN的性能影响很大,需要仔细调整。
* `ACTION_DURATION_STEPS`: 动作(相位)的持续时间。太短可能导致不稳定,太长则响应慢。
SUMO端口 :
* 在 `run_one_episode_std_rl` 中,`port=8813 + (i_episode % 5)` 是一种简单的避免SUMO在快速连续启动时端口占用的方法。如果串行且没有问题,可以固定端口。
文件生成 :
* 代码中注释掉的部分展示了如何生成一个非常基础的DQN场景文件。你需要根据你的具体需求(特别是 `tlLogic` 和 `laneAreaDetector` 定义)来完善这些文件。**确保SUMO的相位索引与`POSSIBLE_ACTIONS` 数组中的值匹配!**
与调试 :
* 首先确保你的SUMO环境已正确安装并配置。
* 安装 PyTorch (`pip install torch torchvision torchaudio`) 和 NumPy (`pip install numpy`)。
* 创建一个有效的SUMO场景。
* 逐步和调试,先确保与SUMO的交互 (状态获取、动作执行) 是正确的,然后再关注DQN的学习过程。
* 观察奖励曲线是判断训练是否有效的重要手段。如果奖励持续不增长或下降,可能需要调整状态表示、奖励函数或超参数。
5.8 DQN 的进一步考虑和改进
更复杂的状态/动作空间 :
* 如果交叉口非常复杂,或需要控制多个交叉口,状态和动作空间会更大。
* 对于多交叉口控制,可以考虑多智能体强化学习 (MARL) 方法,或者将整个区域的状态输入一个中心化的DQN智能体(如果可行)。
DQN变体 :
* **Double DQN (DDQN)** : 解决了标准DQN中Q值过高估计的问题,通过解耦目标Q值计算中的动作选择和价值评估。在计算目标时,动作由在线网络选择,而该动作的价值由目标网络评估:
(y_t = r_t + \gamma Q(s_{t+1}, \arg\max_{a’} Q(s_{t+1}, a’; \theta_t); \theta_t^-))
* **Dueling DQN** : 将Q网络分解为两个流:一个用于估计状态价值函数 V(s),另一个用于估计每个动作的优势函数 A(s,a)。然后组合它们得到Q值: (Q(s,a) = V(s) + (A(s,a) - \frac{1}{|\mathcal{A}|} \sum_{a’} A(s,a’)))。这有助于学习哪些状态是有价值的,而无需关心动作。
* **Prioritized Experience Replay (PER)** : 不再均匀地从回放缓冲区采样,而是优先选择那些TD误差较大的经验(即智能体预测不准的经验),因为这些经验可能包含更多学习信息。
* **Rainbow DQN** : 结合了多种DQN改进(如DDQN, Dueling DQN, PER, Noisy Nets等)的算法。
超参数调优 :
* DQN对超参数敏感。可以使用网格搜索、随机搜索或更高级的贝叶斯优化方法来寻找最佳超参数组合。
奖励设计 :
* 这是最关键也最具挑战性的部分。奖励函数需要准确反映你的优化目标(如减少延误、提高吞吐量、减少排放等),并且要避免智能体学到利用奖励函数漏洞的"作弊"行为。
与SUMO的交互效率 :
* 对于大规模仿真或需要快速训练的场景,Python与SUMO通过TraCI的交互速度可能成为瓶颈。
* 可以考虑使用 `libsumo` (SUMO的C++库的Python绑定),它通常比TraCI更快,因为它在同一进程中。
* 批量处理TraCI命令(如果适用)也可以提高效率。
通过DQN及其变体,我们可以为更复杂的交通场景设计出更智能、更自适应的信号控制策略。这仍然是一个活跃的研究领域,不断有新的方法和改进被提出来。
第六部分:处理大规模路网和长时间仿真的性能优化技巧
在将强化学习应用于真实的、大规模的交通网络优化时,或者当训练过程需要数百万甚至数千万仿真步数时,仿真速度和计算效率直接影响研究和应用的可行性。以下是一些关键的优化策略和技巧:
6.1 SUMO 自身的性能配置和优化
SUMO 提供了多种命令行选项和配置参数来影响其仿真性能。
选择合适的 SUMO 可执行文件 :
* `sumo`: 命令行版本,通常比 GUI 版本快,适用于后台训练。
* `sumo-gui`: 图形用户界面版本,方便调试和可视化,但会消耗更多资源。在正式训练或大规模仿真时应避免使用。
仿真步长 (--step-length):
* 这是最重要的参数之一。默认值通常是 `1` 秒。
* **较小的步长** (如 `0.1` 秒) 可以提高仿真精度,尤其是在需要精细控制车辆行为(如ACC、CACC)或模拟V2X通信时。但它会显著增加计算量和仿真时间。
* **较大的步长** (如 `1` 秒或 `2` 秒) 可以加快仿真速度,但可能会牺牲一些行为的真实性,尤其是在高动态场景下。
* 对于RL应用,决策频率通常与某个整数倍的仿真步长相关。如果RL Agent每 `N` 个仿真步决策一次,而每个决策控制的相位持续 `M` 秒,那么步长需要能被 `M/N` 整除(或者说 `M` 和 `N*step-length` 要匹配)。
* **建议** : 根据应用需求和可接受的精度损失,选择尽可能大的步长。
禁用不必要的输出 (--no-warnings, --no-step-log):
* `--no-warnings`: 禁止SUMO输出警告信息到控制台。在确认场景配置无误后,可以开启以减少IO开销。
* `--no-step-log`: 禁止在每个仿真步输出简要信息。
* 默认情况下,SUMO可能会生成一些XML输出文件(如行程信息、排放信息等)。如果这些文件在RL训练过程中不是必需的,可以通过命令行选项禁用它们的生成,或者减少它们的输出频率。
* 例如,`--summary-output /dev/null` (Linux) 或 `NUL` (Windows) 可以将摘要输出重定向到空设备。
* `--tripinfo-output.write-unfinished` 选项可以控制是否写入未完成的行程信息。
* 许多输出选项都有 `*.period` 子选项,可以设置输出的周期。例如,`--statistic-output.period 3600` 表示每3600秒输出一次统计信息。
线程数 (--threads):
* SUMO 本身在某些计算上(如车辆移动和碰撞检测)可以利用多核处理器。
* `--threads <INT>`: 设置SUMO可以使用的线程数。通常设置为机器的物理核心数可以获得较好效果。但并非所有部分都能完美并行,过多的线程有时反而可能因同步开销导致性能下降。
* **建议** : 测试不同的线程数,找到最适合你硬件和场景的设置。
随机数种子 (--seed <INT>):
* 虽然不是直接的性能优化,但在RL训练中,为了实验的可复现性,设置固定的随机数种子非常重要。`--seed` 选项可以控制SUMO内部的随机过程。Python脚本中的随机数(如`random`库, `numpy.random`)也需要设置种子。
瞬移时间 (--time-to-teleport <FLOAT>):
* 当车辆长时间卡在拥堵中无法前进时,SUMO 允许将其“瞬移”到下游或从仿真中移除,以避免整个仿真因局部死锁而停滞。
* `--time-to-teleport -1`: 禁止车辆瞬移。这在需要真实模拟拥堵演化时是必要的,但也可能导致仿真在严重拥堵下卡死。
* `--time-to-teleport <SECONDS>`: 设置车辆在宣布卡死并可能瞬移前的最大等待时间。
* 对于RL,如果目标是缓解拥堵,通常会禁止瞬移或设置一个非常大的瞬移时间,让Agent学习如何处理拥堵。如果允许瞬移,Agent可能会学到让车辆瞬移的“坏”策略。
* **性能影响** : 如果大量车辆瞬移,可能会引入一些计算开销,但也可能通过移除卡死车辆来“加速”仿真(尽管这不是真实的加速)。
路网简化和抽象 :
* 对于非常大规模的路网,如果研究重点只在特定区域,可以考虑:
* **仿真边界** : 只模拟感兴趣的核心区域,外围区域通过动态的车辆源和汇(source/sink edges)来表示。
* **路网聚合 (Network Aggregation)** : 对于非核心区域,可以将详细的路网结构简化为更宏观的模型或更少的路段。SUMO本身不直接提供高级的自动聚合工具,但这可以通过预处理脚本实现。
* **分层控制 (Hierarchical Control)** : 例如,高层RL Agent控制区域边界流量或宏观策略,底层Agent控制具体交叉口。
车辆行为模型的复杂度 :
* SUMO允许为不同车辆类型定义不同的跟驰模型 (Car-Following Model, e.g., Krauss, IDM, Wiedemann) 和换道模型 (Lane-Changing Model, e.g., SL2015, LC2013)。
* 更复杂的模型通常更真实,但也更耗计算资源。如果研究不需要非常精细的微观行为,可以选择计算效率更高的模型。
* `<vType>` 定义中的参数,如 `sigma` (驾驶员不完美性/随机性),如果设置过大,可能导致更多不稳定的行为和碰撞,从而增加仿真复杂性。
检测器的数量和频率 :
* RL Agent的状态获取依赖于检测器数据。大量的检测器,或者非常高频率的检测器数据更新 (如 `e1Detector` 的 `freq` 属性设得很小),会增加数据处理和TraCI通信的开销。
* **建议** : 只放置和查询RL Agent决策所必需的检测器,并设置合理的更新频率。
SUMO版本 :
* 保持SUMO更新到最新稳定版本。开发者会不断进行性能优化和bug修复。
6.2 TraCI 交互的优化 (Python 端)
Python与SUMO通过TraCI的交互是性能瓶颈的常见来源,因为每次TraCI调用都涉及到进程间通信 (IPC)。
libsumo (推荐) :
* `libsumo` 是SUMO仿真库的Python绑定,它将SUMO作为一个Python模块直接加载到Python进程中。这消除了TraCI基于Socket的IPC开销,从而**显著提高** 交互速度,尤其是在需要频繁调用TraCI函数的场景下。
* **使用方法** :
* 安装: 通常随SUMO一起提供。你需要确保Python环境能够找到它。
* 代码修改:
# 使用 TraCI (基于socket)
# import traci
# traci.start(["sumo", "-c", "config.sumocfg"])
# while traci.simulation.getMinExpectedNumber() > 0:
# traci.simulationStep()
# # ... traci calls ...
# traci.close()
# 使用 libsumo
import libsumo as traci # 注意导入名称仍然可以是 traci 以便兼容
# 或者 import libsumo
# libsumo.start(["sumo", "-c", "config.sumocfg"]) # 或者 traci.start(...)
# # 对于libsumo,通常不需要显式指定端口,因为它在同一进程中
# # 确保命令行参数中不要有 --remote-port
# sumo_cmd = ["sumo", "-c", "your_scenario.sumocfg", "--step-length", "1"] # 移除 --remote-port
# traci.start(sumo_cmd) # 启动 libsumo
# while traci.simulation.getMinExpectedNumber() > 0:
# traci.simulationStep() # 执行一步仿真
# # ... traci.lane.getWaitingTime(...) 等调用 ...
# traci.close() # 关闭libsumo控制的仿真
python

-
注意事项 :
* 当使用libsumo时,SUMO仿真在Python进程的主线程中。如果Python脚本中有耗时的计算(例如神经网络的前向/后向传播),并且这些计算不是异步的,那么仿真可能会被阻塞。
*libsumo不支持sumo-gui,它只能与命令行sumo一起工作。
* 错误处理可能与标准TraCI略有不同。
批量命令 (Subscriptions) :
TraCI允许你订阅 (subscribe) 某些对象(车辆、车道、检测器等)的特定变量。订阅后,SUMO会在每个仿真步自动收集这些变量的值,你可以通过一次调用获取所有订阅变量的最新结果,而不是为每个变量单独进行多次调用。这大大减少了IPC的次数。
订阅过程 :
1. 在仿真开始或需要时,为感兴趣的对象和变量添加订阅。
* `traci.vehicle.subscribe(vehID, [varID1, varID2, ...], beginTime, endTime)`
* `traci.lane.subscribe(laneID, [varID_lane1, ...])`
* `traci.inductionloop.subscribe(detID, [varID_det1, ...])`
* `traci.junction.subscribeContext(junctionID, tc.constants.CMD_GET_VEHICLE_VARIABLE, 100, [tc.constants.VAR_SPEED, tc.constants.VAR_LANE_ID])` (订阅路口一定范围内的车辆信息)
2. 在每个仿真步之后,获取订阅结果。
* `results_veh = traci.vehicle.getSubscriptionResults(vehID)`
* `results_lane = traci.lane.getSubscriptionResults(laneID)`
* `results_det = traci.inductionloop.getSubscriptionResults(detID)`
* `results_context = traci.junction.getSubscriptionResults(junctionID)`
这些调用返回一个字典,键是变量ID (来自 tc.constants),值是该变量的当前值。
示例 : 订阅一个检测器的占用率和平均速度。
import traci
import traci.constants as tc # 导入TraCI常量
# ... (start_simulation) ...
detector_id = "my_detector_0" # 你的检测器ID
# 假设在 .add.xml 或 .net.xml 中定义了该检测器
# 添加订阅 (通常在仿真循环开始前)
# VAR_LAST_STEP_MEAN_SPEED (81), VAR_LAST_STEP_OCCUPANCY (80)
try:
traci.inductionloop.subscribe(detector_id,
[tc.VAR_LAST_STEP_MEAN_SPEED, tc.VAR_LAST_STEP_OCCUPANCY],
0, 2147483647) # 从0到最大时间订阅
print(f"Subscribed to detector {detector_id}") # 打印订阅成功信息
except traci.TraCIException as e: # 捕获TraCI异常
print(f"Could not subscribe to detector {detector_id}: {e}") # 打印订阅失败信息
# 可能检测器ID不存在或类型不匹配
# --- 仿真循环 ---
# while traci.simulation.getMinExpectedNumber() > 0:
# traci.simulationStep() # 执行一步仿真
#
# try:
# detector_data = traci.inductionloop.getSubscriptionResults(detector_id) # 获取订阅结果
# if detector_data: # 如果结果非空
# mean_speed = detector_data.get(tc.VAR_LAST_STEP_MEAN_SPEED) # 获取平均速度
# occupancy = detector_data.get(tc.VAR_LAST_STEP_OCCUPANCY) # 获取占用率
# # print(f"Det: {detector_id}, Speed: {mean_speed}, Occupancy: {occupancy}") # 打印数据
# else:
# # print(f"No subscription data for {detector_id} this step.") # 打印无数据信息
# pass
# except traci.TraCIException as e: # 捕获TraCI异常
# print(f"Error getting subscription results for {detector_id}: {e}") # 打印获取错误信息
# break # 出现错误则跳出循环
#
# # ... (RL agent logic) ...
# traci.close()
python

何时使用 : 当你需要为大量对象或同一对象的多个变量在每个步骤获取数据时,订阅机制能带来显著的性能提升。
减少不必要的 TraCI 调用 :
* **缓存静态数据** : 路网结构(如车道列表、交叉口连接关系)在仿真过程中通常是不变的。可以在仿真开始时获取这些信息并缓存在Python脚本中,避免在每个仿真步重复查询。
* **按需获取** : 只在RL Agent需要决策或计算奖励时才获取相关的状态信息,而不是在每个仿真步都获取所有可能的数据。
* **逻辑聚合** : 如果你需要基于多个原始TraCI数据计算一个综合指标(例如,一个区域的总排队长度),尽可能在获取原始数据后,在Python端一次性计算,而不是通过多个TraCI调用让SUMO分别计算再传输。
控制 TraCI 调用频率 :
* RL Agent 的决策频率 (`ACTION_DURATION_STEPS` 或类似参数) 控制了状态获取和动作执行的频率。如果决策频率较低(例如每10秒或更长),那么在决策间隔内就不需要频繁调用TraCI获取完整状态。
6.3 Python 代码本身的优化
NumPy 和向量化操作 :
* 当处理状态特征、奖励计算或神经网络输入/输出时,如果涉及到大量数值运算,应优先使用NumPy数组及其向量化操作,而不是Python的原生列表和循环。NumPy的底层实现是C,速度远快于Python循环。
* **示例** : 计算多个检测器的总排队长度。
import numpy as np # 导入NumPy库
# 假设 queues_list 是一个Python列表,包含各个检测器的排队数
# queues_list = [get_queue("det1"), get_queue("det2"), ..., get_queue("detN")]
# 使用NumPy
# queues_array = np.array([get_queue(det_id) for det_id in detector_ids_list]) # 创建NumPy数组
# total_queue_numpy = np.sum(queues_array) # 使用NumPy的sum函数计算总和
# 对比Python原生循环
# total_queue_python = 0
# for det_id in detector_ids_list:
# total_queue_python += get_queue(det_id) # 循环累加
python

对于大规模数据,NumPy的优势非常明显。
高效的数据结构 :
* 根据需求选择合适的数据结构。例如,对于需要快速查找的场景,使用字典 (dict) 或集合 (set) 通常比列表 (list) 更高效。
* `collections.deque` 用于实现经验回放缓冲区就比Python列表在两端添加/删除元素时更高效。
避免不必要的对象创建 :
* 在循环中频繁创建和销毁大量小对象(例如,在每个仿真步都为状态向量创建一个新的列表或元组)可能会增加内存管理开销和垃圾回收的压力。如果可能,尝试复用对象或使用更轻量级的数据结构。
* 例如,如果状态向量结构固定,可以预先分配一个NumPy数组,然后在每个步骤更新其内容,而不是每次都创建一个新的。
性能分析工具 (Profiling) :
* 当遇到性能瓶颈时,不要凭感觉猜测,而应使用Python的性能分析工具来定位代码中的热点。
* **`cProfile` 和 `profile`**: Python内置的分析器,可以提供函数调用次数、执行时间等详细信息。
import cProfile # 导入cProfile库
import pstats # 导入pstats库
# def main_training_loop():
# # ... 你的训练代码 ...
# pass
# if __name__ == '__main__':
# profiler = cProfile.Profile() # 创建Profile对象
# profiler.enable() # 开始分析
#
# # main_training_loop() # 你的主函数
#
# profiler.disable() # 停止分析
# stats = pstats.Stats(profiler).sort_stats('cumulative') # 创建Stats对象并按累积时间排序
# stats.print_stats(20) # 打印前20个耗时最多的函数
# # stats.dump_stats("profile_results.prof") # (可选) 将结果保存到文件
python

-
line_profiler: 可以逐行分析代码的执行时间,对于定位循环或特定语句的瓶颈非常有用。需要额外安装 (pip install line_profiler) 并在目标函数上使用@profile装饰器。memory_profiler: 用于分析代码的内存使用情况。
Just-In-Time (JIT) 编译器 (如 Numba) :
* 对于计算密集型的纯Python函数(特别是包含大量循环和NumPy操作的函数),Numba可以通过LLVM将Python字节码编译成高效的机器码,从而大幅提升性能。
* 需要额外安装 (`pip install numba`)。
* **使用方法** : 在函数上添加 `@numba.jit` 装饰器。
import numba # 导入Numba库
import numpy as np # 导入NumPy库
# @numba.jit(nopython=True) # "nopython=True" 模式通常性能最好,但要求函数内所有操作都被Numba支持
# def some_computationally_intensive_function(data_array):
# result = 0.0 # 初始化结果
# for i in range(data_array.shape[0]): # 遍历数组
# for j in range(data_array.shape[1]): # 遍历数组
# result += np.sqrt(data_array[i, j] * np.log(data_array[i,j] + 1e-6)) # 执行复杂计算
# return result # 返回结果
python

- Numba对于科学计算和数据处理代码的加速效果显著,但并非所有Python代码都能被Numba优化。
6.4 强化学习训练过程的优化
异步执行与并行化 :
* **RL Agent 学习与 SUMO 仿真分离** :
* 在理想情况下,SUMO仿真(环境交互)和RL Agent的神经网络训练可以并行执行。
* 可以使用多进程或多线程:一个或多个进程/线程负责SUMO实例并收集经验,将经验发送到主进程;主进程中的Agent则专注于从经验回放缓冲区中采样并训练网络。
* Python的 `multiprocessing` 模块可以用来创建并管理并行SUMO的子进程。
* 需要有效的进程间通信机制(如 `multiprocessing.Queue`)来传递经验数据。
* **挑战** : 实现并行RL框架(如A2C, A3C, IMPALA, Ape-X)比单线程DQN复杂,需要处理好数据同步、梯度更新和资源管理。
* **Ape-X DQN 架构思想** :
* **Actors** : 多个并行的Actor进程,每个Actor自己的SUMO环境副本,与环境交互,使用当前的策略网络选择动作,并将产生的经验发送给Replay Buffer。Actor可能使用不同探索率的策略。
* **Learner** : 一个或多个Learner进程,从共享的(或分布式的)Replay Buffer中采样经验,计算梯度并更新中央的策略网络参数。
* **Replay Buffer** : 一个大的、可能优先级的经验回放缓冲区。
* Actor会定期从Learner那里获取最新的网络参数。
这种分布式架构可以极大地提高样本收集效率和训练速度。
GPU 加速 :
* 确保你的深度学习框架 (PyTorch, TensorFlow) 正确配置并使用GPU进行神经网络的训练 (前向和反向传播)。这对于具有大型网络或大批量数据的DQN训练至关重要。
* 在 `DQNAgent` 初始化时,将 `device` 参数设置为 `"cuda"` (如果GPU可用)。
* 将网络模型 (`self.policy_net`, `self.target_net`) 和训练数据张量 (`state_batch`, `action_batch`, etc.) 及时 `.to(self.device)`。
高效的经验回放实现 :
* `ReplayBuffer` 的实现需要高效。`collections.deque` 是一个不错的选择。
* 对于非常大的缓冲区,考虑使用更专门的库或数据结构,或者将缓冲区的一部分存储在磁盘上(如果内存不足),但这会增加I/O开销。
* Prioritized Experience Replay (PER) 的实现(如使用SumTree数据结构)需要小心,以确保其操作不会成为新的瓶颈。
减少学习频率 (如果必要) :
* 如果神经网络训练非常耗时,而环境交互相对较快,可以考虑不是在每次存储转换后都调用 `agent.learn()`,而是每隔 `N` 次转换或 `M` 个仿真步才进行一次或多次学习更新。这是一种权衡,可能会减慢学习速度,但能提高整体的wall-clock time效率。
模型大小和复杂度 :
* 神经网络的大小(层数、每层神经元数量)直接影响训练时间和推理时间。
* 从一个相对较小的模型开始,根据需要逐渐增加其复杂度。过度复杂的模型不仅训练慢,还可能更容易过拟合。
批量大小 (Batch Size) :
* `batch_size` 参数影响梯度估计的稳定性和训练速度。
* 较大的批量通常提供更稳定的梯度,但每次更新需要更多计算。
* 较小的批量更新更快,但梯度可能更嘈杂。
* 需要根据GPU内存和具体问题进行调整。
6.5 基础设施和硬件
- 强大的CPU : SUMO仿真主要受CPU性能限制。高主频、多核心的CPU对加速仿真至关重要。
- 足够的RAM : 大规模路网、长时间仿真以及大型经验回放缓冲区都需要足够的内存。
- 快速的存储 (SSD) : 如果涉及到大量的文件读写(如SUMO的输出、保存/加载模型、经验缓冲区溢出到磁盘等),SSD比HDD能提供更好的性能。
- GPU : 对于深度强化学习,一块或多块性能良好的GPU是加速神经网络训练的关键。
6.6 示例:使用 libsumo 和一个简单的优化思路
以下代码片段展示了如何切换到 libsumo,并提示了一些可以考虑的优化点。
# 假设这是你的主训练脚本
import os
import sys
import time
import numpy as np
import random
# 尝试导入 libsumo, 如果失败则回退到 traci (或者报错)
try:
import libsumo as traci # 使用 libsumo, 将其别名为 traci 以便代码复用
USING_LIBSUMO = True # 标记正在使用 libsumo
print("Successfully imported libsumo.") # 打印成功导入信息
except ImportError: # 捕获导入错误
try:
import traci # 尝试导入标准的 traci
USING_LIBSUMO = False # 标记未使用 libsumo
print("libsumo not found, falling back to TraCI over socket.") # 打印回退信息
except ImportError: # 再次捕获导入错误
sys.exit("Fatal Error: Neither libsumo nor traci could be imported. Please ensure SUMO is correctly installed and configured.") # 严重错误则退出
import traci.constants as tc # 导入TraCI常量
# --- (你的DQN Agent, ReplayBuffer等类的定义放在这里或导入它们) ---
# class DQNNet(nn.Module): ...
# class ReplayBuffer: ...
# class DQNAgent: ...
# --- 假设这些类已经定义好了 ---
# --- SUMO 和场景配置 ---
TLS_ID = "J1" # 交通灯ID
# ... (其他如 POSSIBLE_ACTIONS, STATE_DIM, INCOMING_LANE_AREA_DETECTORS 定义) ...
# 例如:
INCOMING_LANE_AREA_DETECTORS = ["e2det_N", "e2det_S", "e2det_E", "e2det_W"] # 示例检测器ID
STATE_DIM = len(INCOMING_LANE_AREA_DETECTORS) + 1 # 状态维度
POSSIBLE_ACTIONS = [0, 2] # 示例SUMO相位索引
NUM_ACTIONS = len(POSSIBLE_ACTIONS) # 动作数量
ACTION_DURATION_STEPS = 10 # 动作持续步数
MAX_QUEUE_PER_DETECTOR = 50 # 用于归一化的最大队列长度
# 全局变量
current_phase_logic_global = 0 # 全局当前相位逻辑
def get_sumo_exec_for_libsumo(gui=False): # 定义获取SUMO可执行文件名的函数 (libsumo不需要GUI)
return "sumo" # libsumo 只能与命令行sumo一起工作
def start_simulation_optimized(sumocfg_file, step_length=1.0, threads=None, seed=None, gui=False, port_for_traci_fallback=8813): # 定义优化的仿真启动函数
"""
启动SUMO仿真,优先使用libsumo。
如果使用libsumo,gui参数将被忽略。
"""
global traci # 确保我们使用的是正确的 traci (libsumo 或 socket traci)
sumo_binary = "" # 初始化SUMO执行文件名
sumo_cmd_list = [] # 初始化SUMO命令列表
if USING_LIBSUMO: # 如果正在使用 libsumo
sumo_binary = get_sumo_exec_for_libsumo(False) # 获取命令行sumo
sumo_cmd_list = [sumo_binary, "-c", sumocfg_file] # 构建基本命令
sumo_cmd_list.extend(["--step-length", str(step_length)]) # 添加步长
# libsumo 不使用 --remote-port
print("Starting simulation with libsumo...") # 打印启动信息
else: # 如果回退到TraCI over socket
sumo_binary = "sumo-gui" if gui else "sumo" # 根据gui参数选择执行文件
sumo_cmd_list = [sumo_binary, "-c", sumocfg_file] # 构建基本命令
sumo_cmd_list.extend(["--step-length", str(step_length)]) # 添加步长
sumo_cmd_list.extend(["--remote-port", str(port_for_traci_fallback)]) # 添加远程端口
print(f"Starting simulation with TraCI (socket on port {port_for_traci_fallback})...") # 打印启动信息
# 通用性能选项
sumo_cmd_list.extend(["--no-step-log", "true"]) # 禁止步日志
sumo_cmd_list.extend(["--no-warnings", "true"]) # 禁止警告 (谨慎使用,确保场景无问题)
sumo_cmd_list.extend(["--waiting-time-memory", "1000"]) # 设置等待时间内存
sumo_cmd_list.extend(["--time-to-teleport", "-1"]) # 禁止瞬移 (对于RL通常是好的)
if threads is not None and threads > 0: # 如果指定了线程数
sumo_cmd_list.extend(["--threads", str(threads)]) # 添加线程数参数
if seed is not None: # 如果指定了随机种子
sumo_cmd_list.extend(["--seed", str(seed)]) # 添加随机种子参数
random.seed(seed) # 设置Python内置随机种子
np.random.seed(seed) # 设置NumPy随机种子
# 如果使用PyTorch/TensorFlow,也需要设置它们的种子
# torch.manual_seed(seed)
print(f"SUMO command: {' '.join(sumo_cmd_list)}") # 打印最终的SUMO命令
traci.start(sumo_cmd_list, port=None if USING_LIBSUMO else port_for_traci_fallback) # 启动仿真
# 对于libsumo, port参数会被忽略或者可以不传。对于socket traci,需要指定端口。
# --- 状态和奖励函数 (与之前类似,但可以考虑订阅) ---
# 为了演示订阅,我们假设在 run_one_episode_optimized 中进行订阅
subscribed_detector_vars = [tc.VAR_LAST_STEP_VEHICLE_NUMBER, tc.VAR_LAST_STEP_HALTING_NUMBER] # 定义要订阅的检测器变量
def setup_subscriptions_for_detectors(detector_ids_list): # 定义设置检测器订阅的函数
"""为给定的检测器ID列表订阅指定的变量。"""
for det_id in detector_ids_list: # 遍历检测器ID列表
try:
traci.lanearea.subscribe(det_id, subscribed_detector_vars, 0, 2147483647) # 订阅检测器变量
# print(f"Subscribed to LaneAreaDetector {det_id} for vars: {subscribed_detector_vars}") # 打印订阅信息
except traci.TraCIException as e: # 捕获TraCI异常
print(f"Warning: Failed to subscribe to LaneAreaDetector {det_id}. Error: {e}") # 打印警告信息
# 这可能是因为检测器ID不存在,或者检测器类型不是LaneAreaDetector
# 对于 e1/e3 检测器,使用 traci.inductionloop.subscribe
def get_state_from_subscription(detector_ids_list, max_q_val): # 定义从订阅获取状态的函数
"""通过订阅结果获取状态。"""
global current_phase_logic_global # 使用全局当前相位逻辑
state_features = [] # 初始化状态特征列表
for det_id in detector_ids_list: # 遍历检测器ID列表
queue_val = 0.0 # 初始化队列值
try:
sub_results = traci.lanearea.getSubscriptionResults(det_id) # 获取检测器的订阅结果
if sub_results: # 如果结果非空
# 使用getLastStepVehicleNumber作为排队长度的代理
num_vehicles = sub_results.get(tc.VAR_LAST_STEP_VEHICLE_NUMBER, 0) # 获取车辆数,默认为0
queue_val = min(num_vehicles / max_q_val, 1.0) # 归一化队列值
except traci.TraCIException as e: # 捕获TraCI异常
# print(f"Warning: Error getting subscription for {det_id}: {e}") # 打印警告
pass # 忽略错误,使用默认值0.0
state_features.append(queue_val) # 添加队列值到特征列表
state_features.append(current_phase_logic_global) # 添加当前相位逻辑
return np.array(state_features, dtype=np.float32) # 返回状态特征数组
def get_reward_from_subscription(detector_ids_list): # 定义从订阅获取奖励的函数
"""通过订阅结果计算奖励 (例如,负的总停止车辆数)。"""
total_halting = 0 # 初始化总停止车辆数
for det_id in detector_ids_list: # 遍历检测器ID列表
try:
sub_results = traci.lanearea.getSubscriptionResults(det_id) # 获取检测器的订阅结果
if sub_results: # 如果结果非空
total_halting += sub_results.get(tc.VAR_LAST_STEP_HALTING_NUMBER, 0) # 累加停止车辆数,默认为0
except traci.TraCIException: # 捕获TraCI异常
pass # 忽略错误
return -float(total_halting) # 返回负的总停止车辆数
# --- 优化的训练循环 ---
def run_one_episode_optimized(episode_num, agent, sumocfg_file,
max_steps_per_episode=1800,
sumo_step_length=1.0,
sumo_threads=None,
sumo_seed_offset=0, # 用于为每个episode设置不同但可复现的种子
gui_for_traci_fallback=False,
port_for_traci_fallback=8813): # 定义优化的单轮训练函数
global current_phase_logic_global # 使用全局当前相位逻辑
# 为每个episode设置种子,如果提供了sumo_seed_offset
episode_seed = None # 初始化回合种子
if sumo_seed_offset is not None: # 如果提供了种子偏移
episode_seed = sumo_seed_offset + episode_num # 计算回合种子
start_simulation_optimized(sumocfg_file,
step_length=sumo_step_length,
threads=sumo_threads,
seed=episode_seed,
gui=gui_for_traci_fallback,
port_for_traci_fallback=port_for_traci_fallback) # 启动优化仿真
# 设置订阅
setup_subscriptions_for_detectors(INCOMING_LANE_AREA_DETECTORS) # 设置检测器订阅
current_phase_logic_global = 0 # 初始化当前相位逻辑
traci.trafficlight.setPhase(TLS_ID, POSSIBLE_ACTIONS[current_phase_logic_global]) # 设置初始相位
# 短暂以稳定并获取初始状态
for _ in range(5): # 5步
if traci.simulation.getMinExpectedNumber() <= 0: break # 如果仿真结束则跳出
traci.simulationStep() # 执行一步仿真
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否提前结束
traci.close(); return 0, 0 # 关闭并返回
state = get_state_from_subscription(INCOMING_LANE_AREA_DETECTORS, MAX_QUEUE_PER_DETECTOR) # 获取初始状态
total_reward_episode = 0 # 初始化本轮总奖励
simulation_start_wall_time = time.time() # 记录仿真开始的墙上时间
for step_in_episode in range(max_steps_per_episode): # 在每轮最大步数内循环
if traci.simulation.getMinExpectedNumber() <= 0: # 检查仿真是否提前结束
# print(f"SUMO closed prematurely at step {step_in_episode} in optimized episode {episode_num}.")
break # 跳出循环
action_idx = agent.select_action(state) # Agent选择动作
chosen_sumo_phase = POSSIBLE_ACTIONS[action_idx] # 获取对应的SUMO相位
current_phase_logic_global = action_idx # 更新全局相位逻辑
traci.trafficlight.setPhase(TLS_ID, chosen_sumo_phase) # 设置交通灯相位
# 动作持续期
for _ in range(ACTION_DURATION_STEPS): # 动作持续期循环
if traci.simulation.getMinExpectedNumber() <= 0: break # 如果仿真结束则跳出
traci.simulationStep() # 执行一步仿真
if traci.simulation.getMinExpectedNumber() <= 0: break # 再次检查
next_state = get_state_from_subscription(INCOMING_LANE_AREA_DETECTORS, MAX_QUEUE_PER_DETECTOR) # 获取下一状态
reward = get_reward_from_subscription(INCOMING_LANE_AREA_DETECTORS) # 获取奖励
done = (step_in_episode * ACTION_DURATION_STEPS >= max_steps_per_episode - ACTION_DURATION_STEPS) # 判断是否回合结束
# (注意:这里的step_in_episode是决策步的计数,不是仿真步的计数)
# 或者更简单: done = ( (step_in_episode + 1) * ACTION_DURATION_STEPS >= max_steps_per_episode )
agent.store_transition(state, action_idx, next_state, reward, done) # 存储经验
# 学习可以不那么频繁,例如每隔几个决策步
if step_in_episode % 5 == 0 : # 每5个决策步学习一次 (可调)
if len(agent.replay_buffer) > agent.batch_size * 2: # 确保有足够样本
agent.learn() # Agent学习
state = next_state # 更新状态
total_reward_episode += reward # 累加总奖励
# Epsilon更新也可以不那么频繁
if step_in_episode % 10 == 0 : # 每10个决策步更新一次epsilon (可调)
agent.update_epsilon() # 更新探索率
if done: # 如果回合结束
break # 跳出主循环
simulation_end_wall_time = time.time() # 记录仿真结束的墙上时间
sim_duration_seconds = (step_in_episode + 1) * ACTION_DURATION_STEPS * sumo_step_length # 计算总仿真时长 (秒)
wall_clock_duration = simulation_end_wall_time - simulation_start_wall_time # 计算墙上时钟时长
# 计算仿真因子 (simulation time / wall-clock time)
# 避免除以零
simulation_factor = sim_duration_seconds / wall_clock_duration if wall_clock_duration > 1e-6 else float('inf')
traci.close() # 关闭TraCI连接
print(f"OptEp {episode_num+1}: Reward: {total_reward_episode:.2f}, " # 打印轮次信息
f"DecSteps: {step_in_episode+1}, SimTime: {sim_duration_seconds:.1f}s, " # 打印决策步数和仿真时长
f"WallTime: {wall_clock_duration:.2f}s, Factor: {simulation_factor:.2f}x, " # 打印墙上时长和仿真因子
f"Eps: {agent.epsilon:.3f}") # 打印探索率
return total_reward_episode, step_in_episode + 1, simulation_factor # 返回总奖励、步数和仿真因子
# --- 主训练代码 (使用优化后的循环) ---
if __name__ == '__main__':
NUM_EPISODES_TRAIN_OPT = 200 # 训练总轮次数 (减少以便测试)
sumo_cfg_opt = "your_scenario_for_dqn.sumocfg" # SUMO配置文件路径
if not os.path.exists(sumo_cfg_opt): # 检查配置文件是否存在
print(f"Error: SUMO configuration file '{sumo_cfg_opt}' not found.") # 打印错误信息
sys.exit(1) # 退出程序
# 创建DQN智能体 (参数与之前类似)
dqn_agent_opt = DQNAgent(state_dim=STATE_DIM,
num_actions=NUM_ACTIONS,
replay_buffer_capacity=10000, # 减少容量以便快速测试
batch_size=32, # 减小批次大小
gamma=0.90, # 折扣因子
learning_rate=0.001, # 学习率
target_update_freq=100, # 目标网络更新频率
epsilon_start=1.0, # 初始探索率
epsilon_end=0.05, # 最终探索率
epsilon_decay=0.995, # 探索率衰减
device="cuda" if torch.cuda.is_available() else "cpu") # 使用GPU或CPU
all_rewards_opt = [] # 初始化所有轮次奖励列表
all_sim_factors = [] # 初始化所有轮次仿真因子列表
for i_ep_opt in range(NUM_EPISODES_TRAIN_OPT): # 循环指定的训练轮次数
# 演示时,可以偶尔打开GUI (但libsumo不支持)
use_gui_fallback = False # 设置是否使用GUI (仅在libsumo不可用时生效)
# if not USING_LIBSUMO and (i_ep_opt % 20 == 0 or i_ep_opt < 2) :
# use_gui_fallback = True
port_val = 8813 + (i_ep_opt % 3) # 端口值
reward_val, steps_val, sim_factor_val = run_one_episode_optimized(
i_ep_opt, dqn_agent_opt, sumo_cfg_opt,
max_steps_per_episode=360, # 每轮决策步数 (例如 360决策 * 10步/决策 = 3600仿真秒)
sumo_step_length=1.0, # SUMO步长
sumo_threads=4, # SUMO线程数 (根据你的CPU调整)
sumo_seed_offset=1000, # 基础种子,确保每轮种子不同
gui_for_traci_fallback=use_gui_fallback,
port_for_traci_fallback=port_val
)
all_rewards_opt.append(reward_val) # 添加本轮奖励到列表
if sim_factor_val != float('inf'): # 如果仿真因子有效
all_sim_factors.append(sim_factor_val) # 添加本轮仿真因子到列表
if (i_ep_opt + 1) % 10 == 0 and all_sim_factors: # 每10轮打印一次平均仿真因子
print(f"--- Avg Sim Factor (last {len(all_sim_factors[-10:])} eps): {np.mean(all_sim_factors[-10:]):.2f}x ---") # 打印平均仿真因子
print("Optimized Training finished.") # 打印训练完成信息
# (可选) 绘制奖励和仿真因子曲线
try:
import matplotlib.pyplot as plt # 导入matplotlib库
fig, ax1 = plt.subplots(figsize=(12, 6)) # 创建图像和第一个y轴
color = 'tab:red' # 设置颜色
ax1.set_xlabel('Episode') # 设置x轴标签
ax1.set_ylabel('Total Reward', color=color) # 设置第一个y轴标签和颜色
ax1.plot(all_rewards_opt, color=color, linestyle='-') # 绘制奖励曲线
ax1.tick_params(axis='y', labelcolor=color) # 设置第一个y轴刻度颜色
ax1.grid(True, axis='y', linestyle=':', alpha=0.7) # 设置第一个y轴网格
ax2 = ax1.twinx() # 共享x轴,创建第二个y轴
color = 'tab:blue' # 设置颜色
ax2.set_ylabel('Simulation Factor (X times real-time)', color=color) # 设置第二个y轴标签和颜色
ax2.plot(all_sim_factors, color=color, linestyle='--') # 绘制仿真因子曲线
ax2.tick_params(axis='y', labelcolor=color) # 设置第二个y轴刻度颜色
fig.tight_layout() # 调整布局以防止标签重叠
plt.title('DQN Training Progress with Performance Metrics') # 设置标题
plt.savefig("dqn_opt_training_metrics.png") # 保存图像
# plt.show()
print("Metrics plot saved to dqn_opt_training_metrics.png") # 打印保存信息
except ImportError: # 捕获导入错误
print("Matplotlib not installed, skipping metrics plot.") # 打印跳过绘图信息
except Exception as e: # 捕获其他异常
print(f"Error during plotting: {e}") # 打印绘图错误
python

优化后的代码说明 :
-
USING_LIBSUMO: 全局标志,检测libsumo是否成功导入。 -
start_simulation_optimized:- 优先尝试使用
libsumo。如果USING_LIBSUMO为True,则构建不含--remote-port的sumo命令。 - 如果回退到 TraCI over socket,则根据
gui参数选择sumo或sumo-gui,并添加--remote-port。 - 集成了多个SUMO性能选项:
--no-step-log,--no-warnings,--threads,--seed,--time-to-teleport。
- 优先尝试使用
-
订阅机制 (
setup_subscriptions_for_detectors,get_state_from_subscription,get_reward_from_subscription):- 演示了如何为
LaneAreaDetector订阅车辆数和停止车辆数。 - 状态和奖励函数现在通过
traci.lanearea.getSubscriptionResults()获取数据,减少了直接的 TraCI 调用次数。
- 演示了如何为
-
run_one_episode_optimized:- 在仿真开始时调用
setup_subscriptions_for_detectors。 - 状态和奖励的获取现在使用基于订阅的函数。
- 增加了仿真因子 (Simulation Factor) 的计算和打印,这是一个衡量仿真速度与真实时间比例的重要指标 (
仿真时间 / 墙上时钟时间)。值越大,仿真越快。 - 学习 (
agent.learn()) 和 Epsilon 更新 (agent.update_epsilon()) 的频率被降低,以平衡计算开销(例如,每5个决策步学习一次)。这需要根据具体任务调整。 sumo_seed_offset用于为每个episode生成不同的种子,便于复现和调试。
- 在仿真开始时调用
-
主训练循环 :
- 调用
run_one_episode_optimized。 - 记录并报告平均仿真因子。
- 绘制包含奖励和仿真因子的图表。
- 调用
如何进一步应用这些技巧 :
- 基准测试 : 在应用任何优化之前和之后,进行基准测试以量化性能提升。测量每个episode的平均墙上时钟时间或仿真因子。
- 逐步优化 : 不要一次性应用所有优化。逐步引入并测试,以确保它们确实带来了好处并且没有引入错误。
- 场景特异性 : 某些优化(如线程数、步长)的效果可能因具体路网、交通需求和硬件而异。需要实验来找到最佳配置。
- RL Agent的计算成本 : 如果RL Agent的
select_action或learn方法本身非常耗时(例如,由于大型神经网络或复杂的计算),那么SUMO或TraCI的优化带来的相对增益可能会减小。这时需要同时优化Agent的计算效率。
通过综合运用这些SUMO、TraCI和Python层面的优化技巧,可以显著提升大规模、长时间联合仿真的性能,使得更复杂和更真实的交通控制研究成为可能。
第七部分:更复杂的错误处理和日志记录策略
在长时间、大规模的仿真和训练过程中,各种预料之外的情况都可能发生:SUMO仿真可能因某些极端交通状况或配置问题而崩溃,TraCI连接可能意外断开,Python脚本本身也可能出现时错误,磁盘空间可能耗尽,或者RL Agent可能进入不良状态。一个良好的错误处理和日志系统能够帮助我们:
- 尽早发现问题 :而不是在数小时或数天后才发现实验失败。
- 诊断问题根源 :通过详细的错误信息和上下文日志。
- 优雅地处理故障 :例如,保存当前进度,发送通知,尝试恢复或安全退出。
- 监控训练过程 :记录关键指标、超参数、Agent状态等,用于分析和比较。
- 确保实验可复现性 :记录所有相关的配置和随机种子。
7.1 Python 中的高级错误处理
Python 的 try...except...else...finally 结构是错误处理的基础。对于复杂的应用,我们可以扩展它:
捕获特定异常 :
* 避免使用笼统的 `except Exception:`,因为它会捕获所有类型的同步异常,包括 `SystemExit` 或 `KeyboardInterrupt`,这可能不是你想要的。
* 尽可能捕获具体的异常类型,例如 `traci.TraCIException`, `ConnectionRefusedError`, `FileNotFoundError`, `ValueError`, `TypeError` 等。
* 可以一次捕获多个特定异常:`except (traci.TraCIException, socket.error) as e:`
异常链 (Exception Chaining) :
* 当你在捕获一个异常后又抛出另一个新的异常时,Python 3 允许你保留原始异常的上下文信息,这对于调试非常有用。
* `raise NewException("Something went wrong") from original_exception`
自定义异常 :
* 为你的应用程序定义特定的异常类,可以使错误处理逻辑更清晰。
class SimulationConfigError(ValueError): # 自定义仿真配置错误,继承自ValueError
"""当SUMO仿真配置存在问题时抛出。"""
pass # 无需额外实现
class AgentStateError(RuntimeError): # 自定义Agent状态错误,继承自RuntimeError
"""当RL Agent进入无效或意外状态时抛出。"""
def __init__(self, message, agent_id=None, current_state_sample=None): # 初始化函数
super().__init__(message) # 调用父类初始化
self.agent_id = agent_id # 保存Agent ID
self.current_state_sample = current_state_sample # 保存当前状态样本
# --- 在代码中使用 ---
# if not valid_config(config_path): # 如果配置无效
# raise SimulationConfigError(f"Invalid SUMO configuration found at {config_path}") # 抛出自定义配置错误
# if agent.is_in_bad_state(): # 如果Agent处于不良状态
# raise AgentStateError("Agent entered a critical bad state.",
# agent_id=agent.id,
# current_state_sample=agent.get_current_internal_state()) # 抛出自定义Agent状态错误
python

finally 块的重要性:
* `finally` 块中的代码无论 `try` 块中是否发生异常都会执行。这对于资源清理非常重要,例如关闭TraCI连接、关闭文件句柄、释放锁等。
# try:
# traci.start(...) # 尝试启动TraCI
# # ... 仿真和学习 ...
# except KeyboardInterrupt: # 捕获键盘中断异常
# print("Training interrupted by user.") # 打印用户中断信息
# except Exception as e: # 捕获其他异常
# print(f"An unexpected error occurred: {e}") # 打印意外错误信息
# # 在这里可以记录更详细的错误信息
# finally:
# if traci.isLoaded(): # 检查TraCI是否已加载 (表示连接仍然有效或部分有效)
# print("Closing TraCI connection...") # 打印关闭TraCI连接信息
# traci.close() # 关闭TraCI连接
# # 其他清理操作,如保存模型检查点
python

7.2 健壮的 TraCI 连接管理和错误处理
TraCI 调用是与外部进程 SUMO 的交互,容易出错。
检查连接状态 :
* 在进行 TraCI 调用(尤其是耗时操作或关键操作)之前和之后,可以检查连接状态。
* `traci.isLoaded()` 可以用来检查 TraCI 是否认为自己已经连接并加载了一个场景。但这并不总是能完全反映SUMO进程的健康状况。
* `traci.simulation.getMinExpectedNumber()` 返回SUMO期望的客户端连接数。如果仿真正常且只有一个Python客户端,它应该返回大于0的数。如果SUMO崩溃或意外关闭,它通常会返回0或负数,或者TraCI调用本身会抛出异常。
TraCIException 处理 :
* 几乎所有的 TraCI 函数在执行失败时(例如,SUMO返回错误,对象不存在,命令格式错误等)都会抛出 `traci.TraCIException`。
* 捕获这个异常并分析其消息内容可以帮助理解SUMO端发生了什么。
# vehicle_id = "non_existent_vehicle" # 一个不存在的车辆ID
# try:
# speed = traci.vehicle.getSpeed(vehicle_id) # 尝试获取车辆速度
# print(f"Speed of {vehicle_id}: {speed}") # 打印车辆速度
# except traci.TraCIException as e: # 捕获TraCI异常
# print(f"TraCI Error: Failed to get speed for {vehicle_id}. Reason: {e}") # 打印TraCI错误信息
# # 日志记录: logger.error(f"TraCI Error for {vehicle_id}: {e}", exc_info=True)
python
超时和重试机制 (谨慎使用) :
* 对于某些 TraCI 调用,如果怀疑是由于瞬时的网络问题或SUMO负载过高导致的失败,可以考虑实现一个简单的重试机制。
* **警告** : 盲目重试可能会掩盖更深层次的问题,或者在SUMO已经崩溃的情况下导致脚本卡死。重试应该有次数限制和超时。
* 对于 `traci.simulationStep()`,如果它阻塞了,通常意味着SUMO内部有问题,重试可能没有帮助。
# def traci_call_with_retry(func, *args, retries=3, delay=1, **kwargs): # 定义带重试的TraCI调用函数
# """
# 尝试执行一个TraCI函数,并在失败时进行重试。
# :param func: 要调用的TraCI函数 (例如 traci.vehicle.getSpeed)。
# :param args: func 的位置参数。
# :param retries: int, 最大重试次数。
# :param delay: int, 重试间的延迟秒数。
# :param kwargs: func 的关键字参数。
# :return: func 的返回值。
# :raises: TraCIException 如果所有重试都失败。
# """
# last_exception = None # 初始化上一个异常
# for attempt in range(retries): # 循环指定的重试次数
# try:
# return func(*args, **kwargs) # 尝试执行函数并返回结果
# except traci.TraCIException as e: # 捕获TraCI异常
# print(f"TraCI call failed (attempt {attempt + 1}/{retries}): {e}. Retrying in {delay}s...") # 打印失败信息
# last_exception = e # 保存当前异常
# time.sleep(delay) # 等待指定的延迟时间
# raise last_exception # 如果所有重试都失败,则抛出最后一个异常
# # 使用示例:
# # try:
# # speed = traci_call_with_retry(traci.vehicle.getSpeed, "some_vehicle_id", retries=2, delay=0.5)
# # except traci.TraCIException as e:
# # logger.error(f"Failed to get speed after multiple retries: {e}")
python

-
更好的替代方案 : 对于
libsumo,由于没有网络IPC,这类瞬时错误会少很多。主要错误会是SUMO内部逻辑错误或API使用不当。
SUMO 进程监控 :
* 如果使用标准的 TraCI (基于socket),SUMO 是一个独立的进程。Python 脚本可以尝试监控这个进程的存活状态。
* 当 `traci.start()` 被调用时,它会启动 SUMO 进程。你可以获取到这个进程的句柄 (例如,通过 `subprocess` 模块启动 SUMO,并保存 `Popen` 对象)。
* 定期检查进程是否仍在 (`popen_object.poll()` 如果返回 `None` 表示仍在)。如果进程意外终止,可以记录错误并尝试优雅地关闭Python脚本。
* `libsumo` 则不需要这种外部进程监控,因为SUMO逻辑在Python进程内。
7.3 Python logging 模块
Python 内置的 logging 模块是实现灵活、强大日志记录功能的标准方式。
基本配置 :
* `logging.basicConfig()`: 提供了一种快速配置日志记录的方法,通常在脚本开始时调用一次。
import logging # 导入logging模块
logging.basicConfig(
level=logging.INFO, # 设置日志记录的最低级别 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format='%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s', # 日志格式
datefmt='%Y-%m-%d %H:%M:%S', # 日期时间格式
# filename='simulation_train.log', # (可选) 将日志输出到文件
# filemode='a' # (可选) 文件模式 ('w' for write, 'a' for append)
)
# 获取一个logger实例 (通常以模块名命名)
logger = logging.getLogger(__name__) # 使用当前模块名获取logger实例
# 或者 logger = logging.getLogger("SUMO_RL_Agent") # 使用自定义名称
# logger.debug("这是一个调试信息,通常用于详细诊断。") # 记录调试信息
# logger.info("仿真周期 10 开始,当前 Epsilon: 0.5") # 记录参考信息
# logger.warning("检测器 'det_X' 返回了意外的空值。") # 记录警告信息
# agent_id = "agent_0" # 示例Agent ID
# error_code = "E001" # 示例错误码
# logger.error(f"Agent {agent_id} 无法计算奖励,错误码: {error_code}") # 记录错误信息
# critical_error_msg = "磁盘空间严重不足,无法保存模型!" # 示例严重错误信息
# logger.critical(critical_error_msg) # 记录严重错误信息
python

日志级别 (Log Levels) :
* `DEBUG`: 最详细的信息,通常只在诊断问题时有用。
* `INFO`: 确认事情按预期工作的一般性信息。
* `WARNING`: 表明发生了一些意外情况,或者将来可能出现问题(例如,磁盘空间不足)。程序仍然按预期工作。
* `ERROR`: 由于一个更严重的问题,程序 نتوانست برخی از عملکردها را انجام دهد.
* `CRITICAL`: 一个严重的错误,表明程序本身可能无法继续。
Handlers (处理器) :
* Handlers 负责将日志记录(由 Logger 生成)发送到指定的目标。
* `StreamHandler`: 发送到流,如 `sys.stdout` 或 `sys.stderr` (basicConfig 默认使用)。
* `FileHandler`: 发送到磁盘文件。
* `RotatingFileHandler`: 当日志文件达到一定大小时,会自动轮转(例如,重命名旧文件并创建新文件)。
* `TimedRotatingFileHandler`: 根据时间间隔(例如每天、每小时)轮转日志文件。
* 可以为一个 Logger 添加多个 Handler,例如,同时输出到控制台和文件,并且可以为不同的 Handler 设置不同的日志级别和格式。
Formatters (格式化器) :
* Formatters 定义了最终日志记录的布局和内容。
* `logging.Formatter(format_string, date_format_string)`
* 常用的格式化占位符:
* `%(asctime)s`: 日期时间。
* `%(levelname)s`: 日志级别名称 (e.g., INFO, ERROR)。
* `%(name)s`: Logger 的名称。
* `%(module)s`: 模块名。
* `%(funcName)s`: 函数名。
* `%(lineno)d`: 行号。
* `%(message)s`: 日志消息本身。
* `%(thread)d`, `%(threadName)s`: 线程信息。
* `%(process)d`: 进程ID。
Filters (过滤器) :
* Filters 提供了更细致的控制,决定哪些日志记录可以被 Handler 处理。可以基于日志级别、Logger 名称或其他自定义逻辑进行过滤。
配置文件 :
* 对于复杂的日志配置,可以使用文件(例如 INI 格式或 JSON/YAML)来定义 Logger、Handler、Formatter 和 Filter,然后使用 `logging.config.fileConfig()` 或 `logging.config.dictConfig()` 加载配置。这使得日志配置与代码分离,更易于管理。
示例:配置多个 Handler 和一个自定义 Logger
import logging # 导入logging模块
import logging.handlers # 导入logging的handlers模块
def setup_main_logger(log_file_path="training_run.log", console_level=logging.INFO, file_level=logging.DEBUG): # 定义设置主logger的函数
"""配置一个具有控制台和文件输出的logger。"""
# 获取/创建 logger (避免使用 root logger, 除非你知道你在做什么)
logger = logging.getLogger("SUMO_TRAFFIC_RL") # 获取名为 "SUMO_TRAFFIC_RL" 的logger实例
logger.setLevel(logging.DEBUG) # 设置logger的最低处理级别为DEBUG,这样才能传递给更高级别的handler
# 防止重复添加handlers (如果此函数被多次调用)
if logger.hasHandlers(): # 如果logger已经有handler
logger.handlers.clear() # 清除已有的handler
# 创建控制台 handler
console_handler = logging.StreamHandler() # 创建StreamHandler用于控制台输出
console_handler.setLevel(console_level) # 设置控制台handler的级别
# 创建文件 handler (例如,每天轮转)
# RotatingFileHandler: 按大小轮转
# file_handler = logging.handlers.RotatingFileHandler(
# log_file_path, maxBytes=5*1024*1024, backupCount=5, encoding='utf-8') # 5MB每个文件, 保留5个备份
# TimedRotatingFileHandler: 按时间轮转
file_handler = logging.handlers.TimedRotatingFileHandler(
log_file_path, when="midnight", interval=1, backupCount=7, encoding='utf-8' # 每天午夜轮转,保留7天备份
)
file_handler.setLevel(file_level) # 设置文件handler的级别
# 创建 formatter 并将其添加到 handlers
formatter_detailed = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(module)s.%(funcName)s:%(lineno)d - %(message)s', # 详细格式
datefmt='%Y-%m-%d %H:%M:%S' # 日期时间格式
)
formatter_simple = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 简单格式
console_handler.setFormatter(formatter_simple) # 为控制台handler设置简单格式
file_handler.setFormatter(formatter_detailed) # 为文件handler设置详细格式
# 将 handlers 添加到 logger
logger.addHandler(console_handler) # 添加控制台handler到logger
logger.addHandler(file_handler) # 添加文件handler到logger
return logger # 返回配置好的logger
# --- 在主脚本中使用 ---
# if __name__ == "__main__":
# main_logger = setup_main_logger(log_file_path="logs/rl_experiment.log",
# console_level=logging.INFO,
# file_level=logging.DEBUG) # 设置主logger
#
# main_logger.info("--- Experiment Started ---") # 记录实验开始信息
# main_logger.info(f"Script arguments: {sys.argv}") # 记录脚本参数
# # ... (记录超参数, 配置信息等) ...
#
# sumo_cfg = "config.sumocfg" # 示例配置文件
# try:
# main_logger.debug(f"Attempting to load SUMO config: {sumo_cfg}") # 记录尝试加载配置信息
# if not os.path.exists(sumo_cfg): # 如果配置文件不存在
# raise FileNotFoundError(f"SUMO config file {sumo_cfg} not found.") # 抛出文件未找到错误
#
# # ... (traci.start, 仿真循环) ...
# for episode in range(10): # 示例循环
# main_logger.info(f"Starting Episode {episode + 1}") # 记录开始轮次信息
# time.sleep(0.1) # 模拟工作
# if episode == 5: # 模拟一个错误
# # main_logger.warning("Simulated warning: High traffic density detected.") # 记录模拟警告
# raise ValueError("Simulated error during episode 5 processing.") # 抛出模拟错误
# main_logger.debug(f"Episode {episode + 1} internal step completed.") # 记录轮次内部步骤完成信息
#
# except FileNotFoundError as fnf_err: # 捕获文件未找到错误
# main_logger.critical(f"Configuration Error: {fnf_err}", exc_info=False) # 记录严重配置错误 (exc_info=False因为我们已经知道是文件问题)
# except traci.TraCIException as traci_err: # 捕获TraCI异常
# main_logger.error(f"A TraCI error occurred during simulation: {traci_err}", exc_info=True) # 记录TraCI错误 (exc_info=True会记录堆栈跟踪)
# except KeyboardInterrupt: # 捕获键盘中断异常
# main_logger.info("--- Experiment Interrupted by User ---") # 记录用户中断实验信息
# except Exception as e: # 捕获其他所有异常
# main_logger.critical(f"--- An Unhandled Critical Error Occurred --- : {e}", exc_info=True) # 记录未处理的严重错误
# finally:
# if 'traci' in sys.modules and traci.isLoaded(): # 检查TraCI是否加载
# main_logger.info("Closing SUMO connection in finally block.") # 记录在finally块中关闭SUMO连接信息
# traci.close() # 关闭TraCI连接
# main_logger.info("--- Experiment Finished (or Terminated) ---") # 记录实验结束信息
python

7.4 记录关键的RL训练信息
除了错误和一般程序流日志,对于RL训练,还需要记录特定的信息:
超参数 :
* 在实验开始时记录所有重要的超参数:学习率、折扣因子、epsilon参数、网络架构、批次大小、优化器类型、经验回放缓冲区大小等。
* 这对于结果复现和比较不同实验设置至关重要。
环境配置 :
* SUMO配置文件路径 (`.sumocfg`)。
* 路网文件、车流文件、附加文件。
* 关键的SUMO仿真参数(步长、随机种子等)。
* 状态表示方法、动作空间定义、奖励函数描述(或其关键参数)。
训练过程指标 (通常每轮或每N轮记录) :
* **Episode Number** : 当前轮次。
* **Total Reward per Episode** : 每轮的总奖励。
* **Average Reward (e.g., over last 100 episodes)** : 滑动平均奖励,用于观察趋势。
* **Episode Length (steps)** : 每轮持续的仿真步数或决策步数。
* **Loss (e.g., DQN loss)** : 训练期间神经网络的损失函数值。可以记录平均损失或每个训练批次的损失。
* **Epsilon (for ε-greedy exploration)** : 当前的探索率。
* **Q-values (for value-based methods)** : (可选) 抽样一些代表性状态的最大Q值或平均Q值,以监控价值函数的学习情况。
* **Wall-clock time per episode/per N steps** : 实际花费的时间,用于评估效率。
* **Simulation Factor** : 如前所述,衡量仿真速度。
* **Replay Buffer size/details** : 缓冲区当前大小,或者采样经验的统计信息。
模型检查点 (Model Checkpoints) :
* 定期保存RL Agent的模型参数(例如神经网络的权重)。
* 不仅要在训练结束时保存,还要在训练过程中(例如每隔M轮,或者当性能达到新的最佳时)保存。
* 检查点文件名应包含有用的信息,如轮次、得分等(例如 `dqn_agent_ep500_reward120.5.pth`)。
* 这允许从中断处恢复训练,或者加载之前性能较好的模型。
评估结果 :
* 在训练过程中,定期将Agent置于评估模式(例如,固定epsilon为一个很小的值,不进行学习更新),在单独的评估环境或固定的评估场景中N轮,并记录其平均性能。这能更客观地衡量Agent的学习进展,因为训练过程中的性能可能因探索而波动。
版本控制信息 :
* 记录当前代码的Git提交哈希 (commit hash),以确保能够准确回溯到生成该结果的特定代码版本。
* `git rev-parse HEAD` 命令可以获取当前HEAD的提交哈希。
集成日志与TensorBoard/Weights & Biases:
对于RL训练指标的可视化和跟踪,专门的工具如 TensorBoard (来自TensorFlow,但可独立使用或与PyTorch集成) 或 Weights & Biases (W&B) 非常有用。
TensorBoard :
* PyTorch通过 `torch.utils.tensorboard.SummaryWriter` 提供接口。
* 可以记录标量值(奖励、损失、epsilon)、直方图(权重、梯度)、图像(例如状态的图像化表示)、文本等。
from torch.utils.tensorboard import SummaryWriter # 导入SummaryWriter
# 在训练开始前初始化 (通常logdir包含实验名称和时间戳)
# writer = SummaryWriter('runs/sumo_rl_experiment_1') # 创建SummaryWriter实例
# # 在训练循环中记录:
# # writer.add_scalar('Training/TotalReward', total_reward_episode, global_step=episode_num)
# # writer.add_scalar('Training/AvgLoss', avg_loss_this_epoch, global_step=episode_num)
# # writer.add_scalar('Parameters/Epsilon', agent.epsilon, global_step=episode_num)
# # writer.add_hparams({'lr': LR, 'batch_size': BS},
# # {'metrics/avg_reward_eval': eval_reward}) # 记录超参数和最终指标
# # 结束后关闭writer
# # writer.close()
python

- 然后可以通过在命令行
tensorboard --logdir=runs来启动TensorBoard服务器查看图表。
Weights & Biases (wandb):
* 一个第三方平台,提供更丰富的实验跟踪、可视化、协作和报告功能。
* 需要安装 `wandb` 库并注册账户。
* 使用起来非常简单:
# import wandb # 导入wandb
# # 1. 初始化 (在脚本开始时)
# # wandb.init(project="sumo_traffic_rl", entity="your_wandb_username",
# # config={ # 记录超参数
# # "learning_rate": 0.001,
# # "architecture": "DQN_SimpleFC",
# # "gamma": 0.99,
# # # ... 更多配置
# # })
# # 2. 记录指标 (在训练循环中)
# # wandb.log({
# # "Episode": episode_num,
# # "Total Reward": total_reward_episode,
# # "Average Loss": avg_loss,
# # "Epsilon": agent.epsilon,
# # "Custom Chart/Queue Length N": q_n_value
# # }) # 记录指标
# # (可选) 保存模型到wandb
# # wandb.save("model.h5")
# # 3. (可选) 结束 (wandb.finish() 通常在脚本结束时自动调用)
python

结合Python的 logging 模块(用于详细的文本日志和错误记录)和 TensorBoard/W&B(用于指标可视化和实验跟踪),可以构建一个非常全面和强大的日志系统。
7.5 实践中的策略
分级日志 :
* 开发和调试时,将控制台日志级别设为 `DEBUG` 或 `INFO`,文件日志级别设为 `DEBUG`。
* 正式或部署时,控制台级别可以提高到 `INFO` 或 `WARNING`,文件日志仍然可以是 `INFO` 或 `DEBUG`(用于事后分析)。
结构化日志 :
* 考虑使用JSON格式或其他结构化格式记录日志,特别是当日志需要被其他工具(如Elasticsearch/Logstash/Kibana (ELK)堆栈,Splunk)解析和分析时。
* Python有如 `python-json-logger` 这样的库可以帮助实现。
异常报告服务 :
* 对于生产环境或非常重要的长期实验,可以集成如 Sentry 或 Rollbar 这样的异常报告服务。它们可以捕获未处理的异常,聚合它们,并提供一个Web界面来查看和管理错误。
上下文信息 :
* 当记录错误或警告时,尽可能多地包含相关的上下文信息。例如:
* 当前的仿真时间、轮次、步骤。
* 涉及的车辆ID、交叉口ID、检测器ID。
* Agent的当前状态(或其摘要)。
* 导致问题的输入数据。
* `logging` 模块的 `exc_info=True` 参数在 `logger.error()`, `logger.exception()`, `logger.critical()` 中会自动添加异常信息(包括堆栈跟踪)。`logger.exception()` 行为类似 `logger.error()` 但总是会记录异常信息,通常在 `except` 块中使用。
资源监控日志 :
* 定期记录系统资源使用情况,如CPU利用率、内存使用量、磁盘空间。这可以帮助发现资源泄漏或预测潜在的资源耗尽问题。
* 可以使用 `psutil` 库来获取这些信息。
# import psutil # 导入psutil库
# import os # 导入os模块
# def log_system_resources(logger_instance): # 定义记录系统资源的函数
# try:
# process = psutil.Process(os.getpid()) # 获取当前进程对象
# cpu_percent = process.cpu_percent(interval=0.1) # 获取CPU使用率 (阻塞0.1秒采样)
# mem_info = process.memory_info() # 获取内存信息
# rss_mb = mem_info.rss / (1024 * 1024) # Resident Set Size in MB
# vms_mb = mem_info.vms / (1024 * 1024) # Virtual Memory Size in MB
#
# disk_usage = psutil.disk_usage('/') # 获取根目录磁盘使用情况 (根据需要修改路径)
# disk_free_gb = disk_usage.free / (1024 * 1024 * 1024) # 可用磁盘空间 (GB)
#
# logger_instance.info(
# f"Resource Snapshot: CPU: {cpu_percent:.1f}%, " # 记录CPU使用率
# f"MemRSS: {rss_mb:.1f}MB, MemVMS: {vms_mb:.1f}MB, " # 记录内存使用
# f"DiskFree: {disk_free_gb:.1f}GB" # 记录可用磁盘空间
# )
# except Exception as e: # 捕获异常
# logger_instance.warning(f"Could not log system resources: {e}") # 记录获取资源失败警告
# # 在训练循环中定期调用:
# # if step % 1000 == 0: # 每1000步记录一次
# # log_system_resources(main_logger)
python

优雅关闭和状态保存 :
* 当捕获到严重错误或 `KeyboardInterrupt` 时,在 `finally` 块中:
* 安全关闭SUMO (`traci.close()`)。
* 保存RL Agent的最新模型检查点。
* 保存经验回放缓冲区的内容(如果它很大并且包含有价值的经验)。
* 记录一个明确的“非正常终止”消息。
通过实施这些更复杂的错误处理和日志记录策略,你的 SUMO + Python + RL 项目将变得更加健壮、易于管理和调试,这对于推动研究和开发至关重要,尤其是在处理复杂系统和长期实验时。
