Payload Logo

Robo Tamer for Qmini 代码解析

Author

Tony Wang

Date Published

train.py

导入模块部分

1import importlib # 用于动态导入模块
2import os # 操作系统接口模块
3from os.path import join # 路径拼接函数
4from env.utils import get_args # 获取命令行参数
5from env.utils.helpers import update_cfg_from_args, class_to_dict, set_seed, parse_sim_params # 辅助函数
6from env import LeggedRobotEnv, GymEnvWrapper # 四足机器人环境和Gym包装器
7from env.tasks import load_task_cls # 加载任务类
8from model import load_actor, load_critic # 加载演员和评论家网络
9from rl.alg import PPO # PPO强化学习算法
10import time # 时间模块
11from collections import deque # 双端队列
12import collections # 集合模块
13import statistics # 统计模块
14from utils.common import clear_dir # 清空目录函数
15from utils.yaml import ParamsProcess # YAML参数处理
16from isaacgym.torch_utils import * # Isaac Gym的PyTorch工具
17from torch.utils.tensorboard import SummaryWriter # TensorBoard日志记录
18import torch # PyTorch深度学习框架

环境变量设置

1# os.environ['CUDA_LAUNCH_BLOCKING'] = '0' # 注释掉的异步CUDA启动
2os.environ['CUDA_LAUNCH_BLOCKING'] = '1' # 设置CUDA同步启动,便于调试

训练函数定义

1def train(): # 定义训练函数

初始化和配置

1 torch.cuda.empty_cache() # 清空CUDA缓存
2 args = get_args() # 获取命令行参数
3 device = args.rl_device # 设置强化学习设备(CPU/GPU)
4 cfg = getattr(importlib.import_module('.'.join(['config', args.config])), args.config) # 动态导入配置文件
5 cfg = update_cfg_from_args(cfg, args) # 用命令行参数更新配置
6 cfg.runner.num_envs = args.num_envs if args.num_envs is not None else cfg.runner.num_envs # 设置环境数量
7 exp_dir = join('experiments', args.name) # 实验目录路径
8 model_dir = join(exp_dir, 'model') # 模型保存目录
9 os.makedirs(model_dir, exist_ok=True) # 创建模型目录
10 all_model_dir = join(exp_dir, 'model', 'all') # 所有模型保存目录
11 os.makedirs(all_model_dir, exist_ok=True) # 创建所有模型目录
12 log_dir = join(exp_dir, 'log') # 日志目录
13 clear_dir(log_dir) # 清空日志目录
14 writer = SummaryWriter(log_dir, flush_secs=10) # 创建TensorBoard写入器
15 num_steps_per_env = cfg.runner.num_steps_per_env # 每个环境的步数
16 num_learning_iterations = cfg.runner.max_iterations # 学习迭代次数
17 set_seed(seed=None) # 设置随机种子

环境和任务初始化

1 sim_params = parse_sim_params(args, class_to_dict(cfg.sim)) # 解析仿真参数
2 env = LeggedRobotEnv(cfg=cfg, # 创建四足机器人环境
3 sim_params=sim_params, # 仿真参数
4 physics_engine=args.physics_engine, # 物理引擎
5 sim_device=args.sim_device, # 仿真设备
6 render=args.render, # 是否渲染
7 fix_cam=args.fix_cam) # 是否固定相机
8 task = load_task_cls(cfg.task.cfg)(env) # 加载并实例化任务类
9 gym_env = GymEnvWrapper(env, task) # 用Gym包装器包装环境
10 task.num_observations = len(gym_env.task.pure_observation()[0]) * gym_env.task.obs_history.maxlen # 计算观察空间维度
11 task.num_actions = len(gym_env.task.action_low) # 计算动作空间维度

配置字典构建

1cfg_dict = collections.OrderedDict() # 创建有序字典
2paramProcess = ParamsProcess() # 创建参数处理器
3cfg_dict.update(paramProcess.class2dict(cfg)) # 将配置转换为字典
4cfg_dict['policy'].update({'num_observations': task.num_observations, 'num_actions': task.num_actions,
5 'num_critic_obs': len(gym_env.task.critic_observation()[0])}) # 更新策略配置
6cfg_dict['action'].update({'action_limit_low': env.dof_pos_limits[:, 0].cpu().numpy(), 'action_limit_up': env.dof_pos_limits[:, 1].cpu().numpy()}) # 更新动作限制
7cfg_dict['action'].update({'action_scale_low': cfg.action.low_ranges[2:], 'action_scale_up': cfg.action.high_ranges[2:]}) # 更新动作缩放
8
9paramProcess.write_param(join(model_dir, "cfg.yaml"), cfg_dict) # 保存配置到YAML文件
10

网络和算法初始化

1 actor = load_actor(cfg_dict['policy'], device).train() # 加载演员网络并设置为训练模式
2 critic = load_critic(cfg_dict['policy'], device).train() # 加载评论家网络并设置为训练模式
3
4
5 alg = PPO(actor, critic, device=device, **class_to_dict(cfg.algorithm)) # 创建PPO算法实例
6 alg.init_storage(cfg.runner.num_envs, num_steps_per_env, [len(gym_env.task.critic_observation()[0])], [task.num_observations], [task.num_actions]) # 初始化存储缓冲区
7 if args.resume is not None: # 如果需要恢复训练
8 resume_model_dir = join(join('experiments', args.resume), 'model') # 恢复模型目录
9 saved_model_state_dict = torch.load(join(resume_model_dir, 'policy.pt')) # 加载保存的模型
10 alg.actor.load_state_dict(saved_model_state_dict['actor']) # 加载演员网络权重
11 alg.critic.load_state_dict(saved_model_state_dict['critic']) # 加载评论家网络权重
12 alg.optimizer.load_state_dict(saved_model_state_dict['optimizer']) # 加载优化器状态
13 current_learning_iteration = saved_model_state_dict['iteration'] # 获取当前迭代次数
14 else:
15 current_learning_iteration = 1 # 从第1次迭代开始

训练变量初始化

1 total_time, total_timesteps = 0., 0 # 总时间和总时间步
2 total_iteration = current_learning_iteration + num_learning_iterations # 总迭代次数
3 rew_buffer, len_buffer,task_rew_buffer = deque(maxlen=100), deque(maxlen=100), deque(maxlen=100) # 奖励、长度、任务奖励缓冲区
4 cur_reward_sum = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前奖励总和
5 cur_task_rew_sum = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前任务奖励总和
6 cur_episode_length = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前回合长度
7
8 obs, cri_obs = gym_env.reset(torch.arange(cfg.runner.num_envs, device=device)) # 重置环境并获取初始观察

主训练循环

1 for it in range(current_learning_iteration, total_iteration): # 主训练循环
2
3 start = time.time() # 记录开始时间
4 for i in range(num_steps_per_env): # 数据收集循环
5 act = alg.act(obs, cri_obs) # 根据观察选择动作
6 obs, cri_obs, rew, done, info, eval_rew = gym_env.step(act,it) # 执行动作并获取新状态
7 alg.process_env_step(rew, done, info) # 处理环境步骤
8 cur_reward_sum += rew # 累加奖励
9 cur_task_rew_sum+=eval_rew # 累加任务奖励
10 cur_episode_length += 1 # 增加回合长度
11 reset_env_ids = (done > 0).nonzero(as_tuple=False)[:, [0]].flatten() # 找到需要重置的环境ID
12 if len(reset_env_ids) > 0: # 如果有环境需要重置
13 rew_buffer.extend(cur_reward_sum[reset_env_ids].cpu().numpy().tolist()) # 记录奖励
14 task_rew_buffer.extend(cur_task_rew_sum[reset_env_ids].cpu().numpy().tolist()) # 记录任务奖励
15 len_buffer.extend(cur_episode_length[reset_env_ids].cpu().numpy().tolist()) # 记录回合长度
16 cur_reward_sum[reset_env_ids] = 0 # 重置奖励计数
17 cur_task_rew_sum[reset_env_ids] = 0 # 重置任务奖励计数
18 cur_episode_length[reset_env_ids] = 0 # 重置回合长度计数

学习和模型保存

1 alg.compute_returns(cri_obs) # 计算回报
2 stop = time.time() # 记录数据收集结束时间
3 collection_time = stop - start # 计算数据收集时间
4 start = stop # 重新开始计时
5 mean_value_loss, mean_surrogate_loss, mean_kl = alg.update() # 更新网络并获取损失
6 saved_model_state_dict = { # 构建保存的模型状态字典
7 'actor': alg.actor.state_dict(), # 演员网络状态
8 'critic': alg.critic.state_dict(), # 评论家网络状态
9 'optimizer': alg.optimizer.state_dict(), # 优化器状态
10 'iteration': current_learning_iteration, # 当前迭代次数
11 }
12 try:
13 torch.save(saved_model_state_dict, join(model_dir, 'policy.pt')) # 保存最新模型
14 except OSError as e:
15 print('Failed to save policy.') # 保存失败提示
16 print(e)
17 if it % cfg.runner.save_interval == 0: # 如果到达保存间隔
18 try:
19 torch.save(saved_model_state_dict, join(all_model_dir, f'policy_{it}.pt')) # 保存带迭代号的模型
20 except OSError as e:
21 print('Failed to save policy.') # 保存失败提示
22 print(e)

性能统计和日志记录

1 stop = time.time() # 记录学习结束时间
2 learn_time = stop - start # 计算学习时间
3 iteration_time = collection_time + learn_time # 计算总迭代时间
4 total_time += iteration_time # 累加总时间
5 total_timesteps += num_steps_per_env * cfg.runner.num_envs # 累加总时间步
6 fps = int(num_steps_per_env * cfg.runner.num_envs / iteration_time) # 计算FPS
7 mean_std = alg.actor.std.mean() # 计算动作标准差均值
8 mean_reward = statistics.mean(rew_buffer) if len(rew_buffer) > 0 else 0. # 计算平均奖励
9 mean_task_reward = statistics.mean(task_rew_buffer) if len(task_rew_buffer) > 0 else 0. # 计算平均任务奖励
10
11 mean_episode_length = statistics.mean(len_buffer) if len(len_buffer) > 0 else 0. # 计算平均回合长度
12 writer.add_scalar('1:Train/mean_reward', mean_reward, it) # 记录平均奖励
13 writer.add_scalar('1:Train/mean_task_reward', mean_task_reward, it) # 记录平均任务奖励
14 writer.add_scalar('1:Train/mean_episode_length', mean_episode_length, it) # 记录平均回合长度
15 writer.add_scalar('1:Train/mean_episode_time', mean_episode_length * gym_env.env.dt, it) # 记录平均回合时间
16
17 writer.add_scalar('2:Loss/value', mean_value_loss, it) # 记录价值损失
18 writer.add_scalar('2:Loss/surrogate', mean_surrogate_loss, it) # 记录代理损失
19 writer.add_scalar('2:Loss/learning_rate', alg.learning_rate, it) # 记录学习率
20 writer.add_scalar('2:Loss/mean_kl', mean_kl, it) # 记录KL散度
21 writer.add_scalar('2:Loss/mean_noise_std', mean_std.item(), it) # 记录噪声标准差
22
23 writer.add_scalar('3:Perf/total_fps', fps, it) # 记录FPS
24 writer.add_scalar('3:Perf/collection_time', collection_time, it) # 记录数据收集时间
25 writer.add_scalar('3:Perf/learning_time', learn_time, it) # 记录学习时间

控制台输出

1 print(f"{args.name}#{it}:", # 打印实验名称和迭代次数
2 f"{'t'} {total_time / 60:.1f}m({iteration_time:.1f}s)", # 总时间(分钟)和迭代时间(秒)
3 f"col {collection_time:.2f}s", # 数据收集时间
4 f"lnt {learn_time:.2f}s", # 学习时间
5 f"nm {fps:.0f}", # FPS(每秒帧数)
6 f"m_kl {mean_kl:.3f}", # 平均KL散度
7 f"{'v_lss:'} {mean_value_loss:.3f}", # 价值损失
8 f"{'a_lss:'} {mean_surrogate_loss:.3f}", # 动作损失
9 # f"l_t {mean_episode_length * gym_env.env.dt:.2f}s", # 注释掉的回合时间
10 f"l_n {int(mean_episode_length)}", # 平均回合长度
11 f"total_rew {mean_reward:.2f}", # 总奖励
12 f"task_rew {mean_task_reward:.2f}", # 任务奖励
13 sep=' ') # 分隔符

主程序入口

1if __name__ == '__main__': # 如果作为主程序运行
2 train() # 调用训练函数

总结

这个文件是一个完整的强化学习训练脚本,专门用于训练四足机器人的控制策略。主要功能包括:

1. 环境设置 :配置CUDA、导入必要模块

2. 参数配置 :处理命令行参数和配置文件

3. 环境初始化 :创建仿真环境和任务

4. 网络初始化 :加载演员-评论家网络

5. 训练循环 :使用 PPO 算法进行强化学习

6. 数据记录 :使用 TensorBoard 记录训练指标

7. 模型保存 :定期保存训练好的模型

8. 性能监控 :实时显示训练进度和性能指标

整个脚本采用了现代强化学习的标准流程,支持断点续训、多环境并行训练、详细的日志记录等功能。