核心内容摘要
男生和女生一起愁愁:那些藏在心底的小秘密
目录
Reinforce介绍
REINFORCE baseline
证明为啥可以降低方差
计算策略梯度的方差
先处理第二项编辑
所以上述相当于找到b优化第一项
证明重要性质
示例代码
解释
Reinforce解释
代码
Reinforce介绍最原始的 REINFORCE 更新公式是其中R代表Q(S,A),也就是某个轨迹的放缩reward。
Reinfore的特点就是通过蒙特卡洛采样的方法采样一个轨迹之后得到Q(S,A)。
上述梯度计算可能方差比较大为了降低方差引入了baseline。
REINFORCE baseline
证明为啥可以降低方差对于Reinforce的这个b(s),通常取一个轨迹的滑动平均。
下面证明这个取法为啥可以降低方差。
计算策略梯度的方差
先处理第二项
所以上述相当于找到b优化第一项这是关于 b 的二次函数对 b 求导结论最优 baseline
证明重要性质因为梯度对参数求导与对动作求和无关
示例代码
解释下面的代码是使用强化学习做一个任务分配的问题机器人和任务的输入都是(x,y)的二维坐标。
之后reward是-欧式距离。
算法最后需要找到距离当前机器人最近的任务的策略。
Reinforce解释由于在这个环境里面一个reward就是一个轨迹所以reward Q(S,A)baseline 就使用滑动平均替代。
r -torch.norm(task_xy[action] - robot_xy) /
1
0 r_item float(r.detach().cpu().item()) baseline (1 - beta) * baseline beta * r_item advantage (r - baseline).detach()
代码import math import random import numpy as np import torch import torch.nn as nn import os # ------------------------- # Reproducibility # ------------------------- seed 42 random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) device cuda if torch.cuda.is_available() else cpu MODEL_PATH reinforce_cross_attn_ckpt.pth # RL ckpt # ------------------------- # Model # ------------------------- class CrossAttnChooser(nn.Module): def __init__(self, d_model32, hidden
: super().__init__() self.robot_enc nn.Sequential( nn.Linear(2, hidden), nn.Tanh(), nn.Linear(hidden, d_model), ) self.task_enc nn.Sequential( nn.Linear(2, hidden), nn.Tanh(), nn.Linear(hidden, d_model), ) self.Wq nn.Linear(d_model, d_model, biasFalse) self.Wk nn.Linear(d_model, d_model, biasFalse) self.Wv nn.Linear(d_model, d_model, biasFalse) self.post_ffn nn.Sequential( nn.Linear(d_model, hidden), nn.ReLU(), nn.Linear(hidden, d_model), ) def forward(self, robot_xy, task_xy): hr self.robot_enc(robot_xy) # (d,) ht self.task_enc(task_xy) # (N,d) Q self.Wq(hr) # (d,) K self.Wk(ht) # (N,d) V self.Wv(ht) # (N,d) attn_scores (K Q) / math.sqrt(K.shape[-1]) # (N,) a torch.softmax(attn_scores, dim
# (N,) c a V # (d,) u self.post_ffn(c) # (d,) logits (K u) / math.sqrt(K.shape[-1]) # (N,) probs torch.softmax(logits, dim
# (N,) return logits, probs # ------------------------- # Helpers / Env # ------------------------- def sample_tasks(n_tasks3, low-
1
0, high
10.
: xy np.random.uniform(low, high, size(n_tasks,
).astype(np.float
return torch.tensor(xy, devicedevice) def nearest_task_index(robot_xy, task_xy): dists torch.norm(task_xy - robot_xy[None, :], dim
return torch.argmin(dists).long() # ------------------------- # Init # ------------------------- model CrossAttnChooser(d_model32, hidden
.to(device) # ✅ RL 训练建议更小 lr避免 logits 直接推爆导致 probs[1,0,0] opt torch.optim.Adam(model.parameters(), lr2e-
robot_xy torch.tensor([
0,
0], devicedevice) total_steps 100000 print_every 1000 save_every 2000 # running baseline (EMA) baseline
0 beta
02 # logging (EMA) reward_ema
0 reward_beta
02 # ✅ 探索相关熵正则 温度 entropy_coef
01 #
005~
05 可调越大越探索 tau
0 # temperature1 更平滑更探索 # ------------------------- # Load checkpoint if exists # ------------------------- start_step 0 if os.path.exists(MODEL_PATH): print(fLoading checkpoint: {MODEL_PATH}) ckpt torch.load(MODEL_PATH, map_locationdevice) model.load_state_dict(ckpt[model]) # ✅ 切 reward / 调 lr 时强烈建议不要 load optimizer动量会把你推向极端 # opt.load_state_dict(ckpt[optimizer]) # start_step int(ckpt.get(step,
) baseline float(ckpt.get(baseline,
0.
) reward_ema float(ckpt.get(reward_ema,
0.
) else: print(No checkpoint found, training from scratch.) def save_ckpt(step): torch.save({ model: model.state_dict(), optimizer: opt.state_dict(), step: step, baseline: baseline, reward_ema: reward_ema, seed: seed, tau: tau, entropy_coef: entropy_coef, }, MODEL_PATH) print(fCheckpoint saved: step{step} - {MODEL_PATH}) # ------------------------- # Training (REINFORCE baseline entropy) # ------------------------- for step in range(start_step 1, total_steps
: task_xy sample_tasks(
logits, _ model(robot_xy, task_xy) # ✅ 用 logits 构造分布数值更稳并用 temperature 拉平 dist torch.distributions.Categorical(logitslogits / tau) action dist.sample() logp dist.log_prob(action) # ✅ reward负欧式距离建议缩放避免 reward/advantage 过大 r -torch.norm(task_xy[action] - robot_xy) /
1
0 r_item float(r.detach().cpu().item()) baseline (1 - beta) * baseline beta * r_item advantage (r - baseline).detach() # ✅ 熵正则鼓励探索防止 probs 早早变成 [1,0,0] entropy dist.entropy() loss -(logp * advantage) - entropy_coef * entropy opt.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(),
1.