核心内容摘要
XXXXL19D18—19D20:解锁你的无限可能,预见非凡的未来
DQN 用max Q(s,a)计算目标值等于在挑 Q 值最高的动作但是这些动作中包括了那些因为估计噪声而被高估的动作素以就会产生过估计偏差直接后果是训练不稳定、策略次优。
这篇文章要解决的就是这个问题内容包括DQN 为什么会过估计、Double DQN 怎么把动作选择和评估拆开、Dueling DQN 怎么分离状态值和动作优势、优先经验回放如何让采样更聪明以及用 PyTorch 从头实现这些改进。
最后还会介绍一个 CleanRL 的专业实现。
过估计问题DQN 的目标值如下y r γ·maxₐ Q(s, a; θ⁻)问题就在于同一个网络既负责选动作a* argmax Q又负责评估这个动作的价值。
Q 值本身是带噪声的估计所以有时候噪声会让差动作的 Q 值偏高取 max 操作天然偏向选那些被高估的动作。
数学上有个直观的解释E[max(X₁, X₂, ..., Xₙ)] ≥ max(E[X₁], E[X₂], ..., E[Xₙ])最大值的期望总是大于等于期望的最大值这是凸函数的 Jensen 不等式。
过估计会导致收敛变慢智能体把时间浪费在探索那些被高估的动作上。
其次是策略质量打折扣高噪声的动作可能比真正好的动作更受青睐。
更糟的是过估计会不断累积导致训练发散。
泛化能力也会受损——在状态空间的噪声区域智能体会表现得过于自信。
Double DQN把选择和评估拆开标准 DQN 一个网络干两件事a* argmaxₐ Q(s, a; θ⁻) # 选最佳动作 y r γ · Q(s, a*; θ⁻) # 评估这个动作同一个网络Double DQN 用两个网络各管一件a* argmaxₐ Q(s, a; θ) # 用当前网络选 y r γ · Q(s, a*; θ⁻) # 用目标网络评估当前网络θ选动作目标网络θ⁻评估。
两个网络的误差不相关这样最大化偏差就被打破了。
为什么有效呢假设当前网络把动作 a 的价值估高了目标网络参数不同大概率不会犯同样的错。
误差相互独立倾向于抵消而非累加。
最通俗的解释就是DQN 像是自己给菜打分、自己挑菜吃这样烂菜可能就混进来了而Double DQN 让朋友打分、你来挑两边的误差对冲掉了。
Standard DQN: E[Q(s, argmaxₐ Q(s,a))] ≥ maxₐ E[Q(s,a)] 有偏 Double DQN: E[Q₂(s, argmaxₐ Q₁(s,a))] ≈ maxₐ E[Q(s,a)] 无偏从 DQN 到 Double DQN只需要改一行# DQN 目标 next_q_valuestarget_network(next_states).max(
[0] targetrewardsgamma*next_q_values* (1-dones) # Double DQN 目标 next_actionscurrent_network(next_states).argmax(
# - 用当前网络选 next_q_valuestarget_network(next_states).gather(1, next_actions.unsqueeze(
) # - 用目标网络评估 targetrewardsgamma*next_q_values.squeeze() * (1-dones)就这一行改动极小效果却很明显。
实现Double DQN扩展 DQN AgentclassDoubleDQNAgent(DQNAgent): Double DQN: 通过解耦动作选择和评估来减少过估计偏差。
def__init__(self, *args, **kwargs): 初始化 Double DQN agent。
从 DQN 继承所有内容只改变目标计算。
super().__init__(*args, **kwargs) defupdate(self) -Dict[str, float]: 执行 Double DQN 更新。
Returns: metrics: 训练指标 iflen(self.replay_buffer) self.batch_size: return {} # 采样批次 states, actions, rewards, next_states, donesself.replay_buffer.sample( self.batch_size ) statesstates.to(self.device) actionsactions.to(self.device) rewardsrewards.to(self.device) next_statesnext_states.to(self.device) donesdones.to(self.device) # 当前 Q 值 Q(s,a;θ) current_q_valuesself.q_network(states).gather(1, actions.unsqueeze(
) # Double DQN 目标计算 withtorch.no_grad(): # 使用当前网络选择动作 next_actionsself.q_network(next_states).argmax(
# 使用目标网络评估动作 next_q_valuesself.target_network(next_states).gather( 1, next_actions.unsqueeze(
).squeeze() # 计算目标 target_q_valuesrewards (1-dones) *self.gamma*next_q_values # 计算损失 lossF.mse_loss(current_q_values.squeeze(), target_q_values) # 梯度下降 self.optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), max_norm
10.
self.optimizer.step() self.training_step1 return { loss: loss.item(), q_mean: current_q_values.mean().item(), q_std: current_q_values.std().item(), target_q_mean: target_q_values.mean().item() }训练函数deftrain_double_dqn( env_name: str, n_episodes: int1000, max_steps: int500, train_freq: int1, eval_frequency: int50, eval_episodes: int10, verbose: boolTrue, **kwargs ) -Tuple: 训练 Double DQN agent使用 DoubleDQNAgent 而不是 DQNAgent。
# 与 train_dqn 相同但使用 DoubleDQNAgent envgym.make(env_name) eval_envgym.make(env_name) state_dimenv.observation_space.shape[0] action_dimenv.action_space.n # 使用 DoubleDQNAgent agentDoubleDQNAgent( state_dimstate_dim, action_dimaction_dim, **kwargs ) # 训练循环与 DQN 相同 stats { episode_rewards: [], episode_lengths: [], losses: [], q_values: [], target_q_values: [], eval_rewards: [], eval_episodes: [], epsilons: [] } print(fTraining Double DQN on {env_name}) print(fState dim: {state_dim}, Action dim: {action_dim}) print(*
forepisodeinrange(n_episodes): state, _env.reset() episode_reward0 episode_length0 episode_metrics [] forstepinrange(max_steps): actionagent.select_action(state, trainingTrue) next_state, reward, terminated, truncated, _env.step(action) doneterminatedortruncated agent.store_transition(state, action, reward, next_state, done) ifstep%train_freq0: metricsagent.update() ifmetrics: episode_metrics.append(metrics) episode_rewardreward episode_length1 statenext_state ifdone: break # 更新目标网络 if (episode
%kwargs.get(target_update_freq,
0: agent.update_target_network() agent.decay_epsilon() # 存储统计信息 stats[episode_rewards].append(episode_reward) stats[episode_lengths].append(episode_length) stats[epsilons].append(agent.epsilon) ifepisode_metrics: stats[losses].append(np.mean([m[loss] forminepisode_metrics])) stats[q_values].append(np.mean([m[q_mean] forminepisode_metrics])) stats[target_q_values].append(np.mean([m[target_q_mean] forminepisode_metrics])) # 评估 if (episode
%eval_frequency0: eval_rewardevaluate_dqn(eval_env, agent, eval_episodes) stats[eval_rewards].append(eval_reward) stats[eval_episodes].append(episode
ifverbose: avg_rewardnp.mean(stats[episode_rewards][-50:]) avg_lossnp.mean(stats[losses][-50:]) ifstats[losses] else0 avg_qnp.mean(stats[q_values][-50:]) ifstats[q_values] else0 print(fEpisode {episode1:4d} | fReward: {avg_reward:
2f} | fEval: {eval_reward:
2f} | fLoss: {avg_loss:
4f} | fQ: {avg_q:
2f} | fε: {agent.epsilon:.3f}) env.close() eval_env.close() print(*
print(Training complete!) returnagent, statsLunarLander-v3# 训练 Double DQN if__name____main__: devicecudaiftorch.cuda.is_available() elsecpu agent_ddqn, stats_ddqntrain_double_dqn( env_nameLunarLander-v3, n_episodes4000, max_steps1000, learning_rate5e-4, gamma
99, epsilon_start
0, epsilon_end
01, epsilon_decay
9995, buffer_capacity100000, batch_size128, target_update_freq20, train_freq4, eval_frequency100, eval_episodes10, hidden_dims[256, 256], devicedevice, verboseTrue ) # 保存模型 agent_ddqn.save(doubledqn_lunar_lander.pth)输出Training Double DQN on LunarLander-v3 State dim: 8, Action dim: 4 Episode 100 | Reward: -
1
24 | Eval: -
8
72 | Loss:
5
9057 | Q:
20 | ε:
951 Episode 200 | Reward: -
1
85 | Eval: -
8
94 | Loss:
3
2449 | Q:
14 | ε:
905 Episode 300 | Reward: -
1
61 | Eval: -
1
48 | Loss:
3
4279 | Q:
52 | ε:
861 Episode 400 | Reward: -
9
21 | Eval: -
1
43 | Loss:
4
5296 | Q:
15 | ε:
819 Episode 500 | Reward: -
8
75 | Eval: -
1
26 | Loss:
5
2701 | Q:
1
70 | ε:
779 ... Episode 3200 | Reward:
1
04 | Eval:
1
71 | Loss:
1
5263 | Q:
2
94 | ε:
202 Episode 3300 | Reward:
1
37 | Eval:
1
79 | Loss:
2
5564 | Q:
2
81 | ε:
192 Episode 3400 | Reward:
1
08 | Eval:
2
40 | Loss:
2
2846 | Q:
3
40 | ε:
183 Episode 3500 | Reward:
1
33 | Eval:
2
32 | Loss:
2
8558 | Q:
3
51 | ε:
174 Episode 3600 | Reward:
1
80 | Eval:
2
42 | Loss:
2
6430 | Q:
3
18 | ε:
165 Episode 3700 | Reward:
1
59 | Eval:
2
56 | Loss:
2
8328 | Q:
3
65 | ε:
157 Episode 3800 | Reward:
1
82 | Eval:
2
36 | Loss:
2
3445 | Q:
3
46 | ε:
149 Episode 3900 | Reward:
1
70 | Eval:
2
99 | Loss:
3
2971 | Q:
4
22 | ε:
142 Episode 4000 | Reward:
1
60 | Eval:
2
17 | Loss:
4
7266 | Q:
4
15 | ε:
135 Training complete!Dueling DQN分离值和优势很多状态下选哪个动作其实差别不大。
CartPole 里杆子刚好平衡时向左向右都行开车走直线方向盘微调的结果差不多LunarLander 离地面还远的时候引擎怎么喷影响也有限。
标准 DQN 对每个动作单独学 Q(s,a)把网络容量浪费在冗余信息上。
Dueling DQN 的思路是把 Q 拆成两部分V(s) 表示这个状态本身值多少A(s,a) 表示这个动作比平均水平好多少。
架构如下标准 DQN: Input - Hidden Layers - Q(s,a₁), Q(s,a₂), ..., Q(s,aₙ) Dueling DQN: |- Value Stream - V(s) Input - Shared Layers | |- Advantage Stream - A(s,a₁), A(s,a₂), ..., A(s,aₙ) Q(s,a) V(s) (A(s,a) - mean(A(s,·)))为什么要减去均值不减的话任何常数加到 V 再从 A 减掉得到的 Q 完全一样网络学不出唯一解。
数学表达如下Q(s,a) V(s) A(s,a) - (1/|A|)·Σₐ A(s,a)也可以用 max 代替 meanQ(s,a) V(s) A(s,a) - maxₐ A(s,a)实践中 max 版本有时效果更好。
举个例子V(s) 10好动作的 A 是 5差动作的 A 是 -3平均优势 (5-
/2 1。
那么 Q(s, 好动作) 10 5 - 1 14Q(s, 差动作) 10 - 3 - 1 6。
实现classDuelingQNetwork(nn.Module): Dueling DQN 架构分离值和优势。
理论: Q(s,a) V(s) A(s,a) - mean(A(s,·)) def__init__( self, state_dim: int, action_dim: int, hidden_dims: List[int] [128, 128] ): 初始化 Dueling Q 网络。
Args: state_dim: 状态空间维度 action_dim: 动作数量 hidden_dims: 共享层大小 super(DuelingQNetwork, self).__init__() self.state_dimstate_dim self.action_dimaction_dim # 共享特征提取器 shared_layers [] input_dimstate_dim forhidden_diminhidden_dims: shared_layers.append(nn.Linear(input_dim, hidden_dim)) shared_layers.append(nn.ReLU()) input_dimhidden_dim self.shared_networknn.Sequential(*shared_layers) # 值流: V(s) 状态的标量值 self.value_streamnn.Sequential( nn.Linear(hidden_dims[-1],
, nn.ReLU(), nn.Linear(128,
) # 优势流: A(s,a) 每个动作的优势 self.advantage_streamnn.Sequential( nn.Linear(hidden_dims[-1],
, nn.ReLU(), nn.Linear(128, action_dim) ) # 初始化权重 self.apply(self._init_weights) def_init_weights(self, module): 初始化网络权重。
ifisinstance(module, nn.Linear): nn.init.kaiming_normal_(module.weight, nonlinearityrelu) nn.init.constant_(module.bias,
0.
defforward(self, state: torch.Tensor) -torch.Tensor: 通过 dueling 架构的前向传播。
Args: state: 状态批次, 形状 (batch_size, state_dim) Returns: q_values: 所有动作的 Q(s,a), 形状 (batch_size, action_dim) # 共享特征 featuresself.shared_network(state) # 值: V(s) - 形状 (batch_size,
valueself.value_stream(features) # 优势: A(s,a) - 形状 (batch_size, action_dim) advantagesself.advantage_stream(features) # 组合: Q(s,a) V(s) A(s,a) - mean(A(s,·)) q_valuesvalueadvantages-advantages.mean(dim1, keepdimTrue) returnq_values defget_action(self, state: np.ndarray, epsilon: float
0.
-int: 使用 ε-greedy 策略选择动作。
ifrandom.random() epsilon: returnrandom.randint(0, self.action_dim-
else: withtorch.no_grad(): state_tensortorch.FloatTensor(state).unsqueeze(
.to( next(self.parameters()).device ) q_valuesself.forward(state_tensor) returnq_values.argmax(dim
.item()Dueling 架构的好处在动作影响不大的状态下学得更好梯度流动更通畅所以收敛更快值估计也更稳健。
还可以把两种改进叠在一起做成Double Dueling DQNclassDoubleDuelingDQNAgent(DoubleDQNAgent): 结合 Double DQN 和 Dueling DQN 的智能体。
def__init__( self, state_dim: int, action_dim: int, hidden_dims: List[int] [128, 128], **kwargs ): 初始化 Double Dueling DQN 智能体。
使用 DuelingQNetwork 而不是标准 QNetwork。
# 暂不调用 super().__init__() # 我们需要以不同方式设置网络 self.state_dimstate_dim self.action_dimaction_dim self.gammakwargs.get(gamma,
0.
self.batch_sizekwargs.get(batch_size,
self.target_update_freqkwargs.get(target_update_freq,
self.devicetorch.device(kwargs.get(device, cpu)) # 探索 self.epsilonkwargs.get(epsilon_start,
1.
self.epsilon_endkwargs.get(epsilon_end,
0.
self.epsilon_decaykwargs.get(epsilon_decay,
0.
# 使用 Dueling 架构 self.q_networkDuelingQNetwork( state_dim, action_dim, hidden_dims ).to(self.device) self.target_networkDuelingQNetwork( state_dim, action_dim, hidden_dims ).to(self.device) self.target_network.load_state_dict(self.q_network.state_dict()) self.target_network.eval() # 优化器 learning_ratekwargs.get(learning_rate, 1e-
self.optimizertorch.optim.Adam(self.q_network.parameters(), lrlearning_rate) # 回放缓冲区 buffer_capacitykwargs.get(buffer_capacity,
self.replay_bufferReplayBuffer(buffer_capacity) # 统计 self.episode_count0 self.training_step0 # update() 方法继承自 DoubleDQNAgent优先经验回放不是所有经验都同等有价值。
TD 误差大的转换说明预测偏离现实能学到东西TD 误差小的转换说明已经学得差不多了再采到也没多大用。
均匀采样把所有转换一视同仁浪费了学习机会。
优先经验回放的思路是让重要的转换被采到的概率更高。
优先级怎么算pᵢ |δᵢ| ε 其中: δᵢ r γ·max Q(s,a) - Q(s,a) TD 误差 ε 小常数保证所有转换都有被采到的可能采样概率P(i) pᵢ^α / Σⱼ pⱼ^α α 控制优先化程度: α 0 - 退化成均匀采样 α 1 - 完全按优先级比例采样优先采样改了数据分布会引入偏差。
所以解决办法是用重要性采样比率来加权更新wᵢ (N · P(i))^(-β) β 控制校正力度: β 0 - 不校正 β 1 - 完全校正通常 β 从
4 开始随训练逐渐增大到
0。
实现classPrioritizedReplayBuffer: 优先经验回放缓冲区。
理论: 按 TD 误差比例采样转换。
我们可以从中学到更多的转换会被更频繁地采样。
def__init__(self, capacity: int, alpha: float
6, beta: float
0.
: Args: capacity: 缓冲区最大容量 alpha: 优先化指数0均匀, 1比例 beta: 重要性采样指数退火到
0 self.capacitycapacity self.alphaalpha self.betabeta self.beta_increment
001 # 随时间退火 beta self.buffer [] self.prioritiesnp.zeros(capacity, dtypenp.float
self.position0 defpush(self, state, action, reward, next_state, done): 以最大优先级添加转换。
理论: 新转换获得最大优先级会很快被采样。
它们的实际优先级在首次 TD 误差计算后更新。
max_priorityself.priorities.max() ifself.bufferelse
0 iflen(self.buffer) self.capacity: self.buffer.append((state, action, reward, next_state, done)) else: self.buffer[self.position] (state, action, reward, next_state, done) self.priorities[self.position] max_priority self.position (self.position
%self.capacity defsample(self, batch_size: int): 按优先级比例采样批次。
Returns: batch: 采样的转换 indices: 采样转换的索引用于优先级更新 weights: 重要性采样权重 iflen(self.buffer) self.capacity: prioritiesself.priorities else: prioritiesself.priorities[:len(self.buffer)] # 计算采样概率 probspriorities**self.alpha probs/probs.sum() # 采样索引 indicesnp.random.choice(len(self.buffer), batch_size, pprobs, replaceFalse) # 获取转换 batch [self.buffer[idx] foridxinindices] # 计算重要性采样权重 totallen(self.buffer) weights (total*probs[indices]) ** (-self.beta) weights/weights.max() # 归一化以保持稳定性 # 退火 beta self.betamin(
0, self.betaself.beta_increment) # 转换为 tensor states, actions, rewards, next_states, doneszip(*batch) statestorch.FloatTensor(np.array(states)) actionstorch.LongTensor(actions) rewardstorch.FloatTensor(rewards) next_statestorch.FloatTensor(np.array(next_states)) donestorch.FloatTensor(dones) weightstorch.FloatTensor(weights) return (states, actions, rewards, next_states, dones), indices, weights defupdate_priorities(self, indices, td_errors): 根据 TD 误差更新优先级。
Args: indices: 采样转换的索引 td_errors: 那些转换的 TD 误差 foridx, td_errorinzip(indices, td_errors): self.priorities[idx] abs(td_error) 1e-6 def__len__(self): returnlen(self.buffer)生产环境会用 sum-tree 数据结构采样复杂度是 O(log N) 而不是这里的 O(N)。
这个简化版本以可读性为优先。
DQN 变体对比几个变体各自解决什么问题呢DQN 是基线用单一网络选动作、评估动作。
它引入了目标网络来稳定移动目标问题但容易过估计 Q 值噪声让智能体去追逐根本不存在的幽灵奖励。
Double DQN 把选和评拆开。
在线网络选动作目标网络评估价值。
实测下来能有效压低不切实际的 Q 值学习曲线明显更平滑。
Dueling DQN 换了网络架构单独学 V(s) 和 A(s,a)。
它的核心认知是很多状态下具体动作的影响不大。
在 LunarLander 这种存在大量冗余动作的环境里样本效率提升明显——不用为每次引擎脉冲都重新学状态值。
Double Dueling DQN 把两边的好处结合起来既减少估计噪声又提高表示效率。
实测中这个组合最稳健达到峰值性能的速度和可靠性都优于单一改进。
实践建议变体选择对比Double DQN 跑得比 DQN 还差可能是训练不够长Double DQN 起步偶尔慢一点或者目标网络更新太频繁或者学习率偏高。
这时可以将训练时间翻倍target_update_freq 调大学习率砍
倍。
Dueling 架构没带来改善可能是环境本身不适合所有状态都很关键或者网络太小或者值流/优势流太浅。
需要对网络加宽加深确认环境里确实有中性状态。
PER 导致不稳定可能是 β 退火太快、α 设太高、重要性采样权重没归一化。
可以减慢 β 增量、α 降到
4-
0.
确认权重做了归一化。
首选 Double DQN 起步代码改动极小收益明确没有额外复杂度。
什么时候加 Dueling状态值比动作优势更重要的环境大量状态下动作值差不多需要更快收敛。
什么时候加 PER样本效率至关重要有算力预算PER 比均匀采样慢奖励稀疏帮助关注少见的成功经验。
最后Rainbow 把六项改进叠在一起Double DQN、Dueling DQN、优先经验回放、多步学习n-step returns、分布式 RLC
噪声网络参数空间探索。
多步学习把 1-step TD 换成 n-step 回报# 1-step TD: y rₜ γ·max Q(sₜ₊₁, a) # n-step: y rₜ γ·rₜ₊₁ γ²·rₜ₊₂ ... γⁿ·max Q(sₜ₊ₙ, a)好处是信用分配更清晰学习更快。
小结这篇文章从 DQN 的过估计问题讲起沿着 Double DQN、Dueling 架构、优先经验回放等等介绍下来每种改进对应一个具体的失败模式max 算子的偏差、低效的状态-动作表示、浪费的均匀采样。
从头实现这些方法能搞清楚它们为什么有效很多高级 RL 算法不过是简单想法的组合理解这些想法本身才是真正可扩展的东西。
https://avoid.overfit.cn/post/4c5835f419d840b0acb0a1eb72f92b6f作者 Jugal Gajjar