Robo Tamer for Qmini 代码解析
Author
Tony Wang
Date Published
train.py
导入模块部分
1import importlib # 用于动态导入模块2import os # 操作系统接口模块3from os.path import join # 路径拼接函数4from env.utils import get_args # 获取命令行参数5from env.utils.helpers import update_cfg_from_args, class_to_dict, set_seed, parse_sim_params # 辅助函数6from env import LeggedRobotEnv, GymEnvWrapper # 四足机器人环境和Gym包装器7from env.tasks import load_task_cls # 加载任务类8from model import load_actor, load_critic # 加载演员和评论家网络9from rl.alg import PPO # PPO强化学习算法10import time # 时间模块11from collections import deque # 双端队列12import collections # 集合模块13import statistics # 统计模块14from utils.common import clear_dir # 清空目录函数15from utils.yaml import ParamsProcess # YAML参数处理16from isaacgym.torch_utils import * # Isaac Gym的PyTorch工具17from torch.utils.tensorboard import SummaryWriter # TensorBoard日志记录18import torch # PyTorch深度学习框架
环境变量设置
1# os.environ['CUDA_LAUNCH_BLOCKING'] = '0' # 注释掉的异步CUDA启动2os.environ['CUDA_LAUNCH_BLOCKING'] = '1' # 设置CUDA同步启动,便于调试
训练函数定义
1def train(): # 定义训练函数
初始化和配置
1 torch.cuda.empty_cache() # 清空CUDA缓存2 args = get_args() # 获取命令行参数3 device = args.rl_device # 设置强化学习设备(CPU/GPU)4 cfg = getattr(importlib.import_module('.'.join(['config', args.config])), args.config) # 动态导入配置文件5 cfg = update_cfg_from_args(cfg, args) # 用命令行参数更新配置6 cfg.runner.num_envs = args.num_envs if args.num_envs is not None else cfg.runner.num_envs # 设置环境数量7 exp_dir = join('experiments', args.name) # 实验目录路径8 model_dir = join(exp_dir, 'model') # 模型保存目录9 os.makedirs(model_dir, exist_ok=True) # 创建模型目录10 all_model_dir = join(exp_dir, 'model', 'all') # 所有模型保存目录11 os.makedirs(all_model_dir, exist_ok=True) # 创建所有模型目录12 log_dir = join(exp_dir, 'log') # 日志目录13 clear_dir(log_dir) # 清空日志目录14 writer = SummaryWriter(log_dir, flush_secs=10) # 创建TensorBoard写入器15 num_steps_per_env = cfg.runner.num_steps_per_env # 每个环境的步数16 num_learning_iterations = cfg.runner.max_iterations # 学习迭代次数17 set_seed(seed=None) # 设置随机种子
环境和任务初始化
1 sim_params = parse_sim_params(args, class_to_dict(cfg.sim)) # 解析仿真参数2 env = LeggedRobotEnv(cfg=cfg, # 创建四足机器人环境3 sim_params=sim_params, # 仿真参数4 physics_engine=args.physics_engine, # 物理引擎5 sim_device=args.sim_device, # 仿真设备6 render=args.render, # 是否渲染7 fix_cam=args.fix_cam) # 是否固定相机8 task = load_task_cls(cfg.task.cfg)(env) # 加载并实例化任务类9 gym_env = GymEnvWrapper(env, task) # 用Gym包装器包装环境10 task.num_observations = len(gym_env.task.pure_observation()[0]) * gym_env.task.obs_history.maxlen # 计算观察空间维度11 task.num_actions = len(gym_env.task.action_low) # 计算动作空间维度
配置字典构建
1cfg_dict = collections.OrderedDict() # 创建有序字典2paramProcess = ParamsProcess() # 创建参数处理器3cfg_dict.update(paramProcess.class2dict(cfg)) # 将配置转换为字典4cfg_dict['policy'].update({'num_observations': task.num_observations, 'num_actions': task.num_actions,5 'num_critic_obs': len(gym_env.task.critic_observation()[0])}) # 更新策略配置6cfg_dict['action'].update({'action_limit_low': env.dof_pos_limits[:, 0].cpu().numpy(), 'action_limit_up': env.dof_pos_limits[:, 1].cpu().numpy()}) # 更新动作限制7cfg_dict['action'].update({'action_scale_low': cfg.action.low_ranges[2:], 'action_scale_up': cfg.action.high_ranges[2:]}) # 更新动作缩放89paramProcess.write_param(join(model_dir, "cfg.yaml"), cfg_dict) # 保存配置到YAML文件10
网络和算法初始化
1 actor = load_actor(cfg_dict['policy'], device).train() # 加载演员网络并设置为训练模式2 critic = load_critic(cfg_dict['policy'], device).train() # 加载评论家网络并设置为训练模式345 alg = PPO(actor, critic, device=device, **class_to_dict(cfg.algorithm)) # 创建PPO算法实例6 alg.init_storage(cfg.runner.num_envs, num_steps_per_env, [len(gym_env.task.critic_observation()[0])], [task.num_observations], [task.num_actions]) # 初始化存储缓冲区7 if args.resume is not None: # 如果需要恢复训练8 resume_model_dir = join(join('experiments', args.resume), 'model') # 恢复模型目录9 saved_model_state_dict = torch.load(join(resume_model_dir, 'policy.pt')) # 加载保存的模型10 alg.actor.load_state_dict(saved_model_state_dict['actor']) # 加载演员网络权重11 alg.critic.load_state_dict(saved_model_state_dict['critic']) # 加载评论家网络权重12 alg.optimizer.load_state_dict(saved_model_state_dict['optimizer']) # 加载优化器状态13 current_learning_iteration = saved_model_state_dict['iteration'] # 获取当前迭代次数14 else:15 current_learning_iteration = 1 # 从第1次迭代开始
训练变量初始化
1 total_time, total_timesteps = 0., 0 # 总时间和总时间步2 total_iteration = current_learning_iteration + num_learning_iterations # 总迭代次数3 rew_buffer, len_buffer,task_rew_buffer = deque(maxlen=100), deque(maxlen=100), deque(maxlen=100) # 奖励、长度、任务奖励缓冲区4 cur_reward_sum = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前奖励总和5 cur_task_rew_sum = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前任务奖励总和6 cur_episode_length = torch.zeros(cfg.runner.num_envs, dtype=torch.float, device=device) # 当前回合长度78 obs, cri_obs = gym_env.reset(torch.arange(cfg.runner.num_envs, device=device)) # 重置环境并获取初始观察
主训练循环
1 for it in range(current_learning_iteration, total_iteration): # 主训练循环23 start = time.time() # 记录开始时间4 for i in range(num_steps_per_env): # 数据收集循环5 act = alg.act(obs, cri_obs) # 根据观察选择动作6 obs, cri_obs, rew, done, info, eval_rew = gym_env.step(act,it) # 执行动作并获取新状态7 alg.process_env_step(rew, done, info) # 处理环境步骤8 cur_reward_sum += rew # 累加奖励9 cur_task_rew_sum+=eval_rew # 累加任务奖励10 cur_episode_length += 1 # 增加回合长度11 reset_env_ids = (done > 0).nonzero(as_tuple=False)[:, [0]].flatten() # 找到需要重置的环境ID12 if len(reset_env_ids) > 0: # 如果有环境需要重置13 rew_buffer.extend(cur_reward_sum[reset_env_ids].cpu().numpy().tolist()) # 记录奖励14 task_rew_buffer.extend(cur_task_rew_sum[reset_env_ids].cpu().numpy().tolist()) # 记录任务奖励15 len_buffer.extend(cur_episode_length[reset_env_ids].cpu().numpy().tolist()) # 记录回合长度16 cur_reward_sum[reset_env_ids] = 0 # 重置奖励计数17 cur_task_rew_sum[reset_env_ids] = 0 # 重置任务奖励计数18 cur_episode_length[reset_env_ids] = 0 # 重置回合长度计数
学习和模型保存
1 alg.compute_returns(cri_obs) # 计算回报2 stop = time.time() # 记录数据收集结束时间3 collection_time = stop - start # 计算数据收集时间4 start = stop # 重新开始计时5 mean_value_loss, mean_surrogate_loss, mean_kl = alg.update() # 更新网络并获取损失6 saved_model_state_dict = { # 构建保存的模型状态字典7 'actor': alg.actor.state_dict(), # 演员网络状态8 'critic': alg.critic.state_dict(), # 评论家网络状态9 'optimizer': alg.optimizer.state_dict(), # 优化器状态10 'iteration': current_learning_iteration, # 当前迭代次数11 }12 try:13 torch.save(saved_model_state_dict, join(model_dir, 'policy.pt')) # 保存最新模型14 except OSError as e:15 print('Failed to save policy.') # 保存失败提示16 print(e)17 if it % cfg.runner.save_interval == 0: # 如果到达保存间隔18 try:19 torch.save(saved_model_state_dict, join(all_model_dir, f'policy_{it}.pt')) # 保存带迭代号的模型20 except OSError as e:21 print('Failed to save policy.') # 保存失败提示22 print(e)
性能统计和日志记录
1 stop = time.time() # 记录学习结束时间2 learn_time = stop - start # 计算学习时间3 iteration_time = collection_time + learn_time # 计算总迭代时间4 total_time += iteration_time # 累加总时间5 total_timesteps += num_steps_per_env * cfg.runner.num_envs # 累加总时间步6 fps = int(num_steps_per_env * cfg.runner.num_envs / iteration_time) # 计算FPS7 mean_std = alg.actor.std.mean() # 计算动作标准差均值8 mean_reward = statistics.mean(rew_buffer) if len(rew_buffer) > 0 else 0. # 计算平均奖励9 mean_task_reward = statistics.mean(task_rew_buffer) if len(task_rew_buffer) > 0 else 0. # 计算平均任务奖励1011 mean_episode_length = statistics.mean(len_buffer) if len(len_buffer) > 0 else 0. # 计算平均回合长度12 writer.add_scalar('1:Train/mean_reward', mean_reward, it) # 记录平均奖励13 writer.add_scalar('1:Train/mean_task_reward', mean_task_reward, it) # 记录平均任务奖励14 writer.add_scalar('1:Train/mean_episode_length', mean_episode_length, it) # 记录平均回合长度15 writer.add_scalar('1:Train/mean_episode_time', mean_episode_length * gym_env.env.dt, it) # 记录平均回合时间1617 writer.add_scalar('2:Loss/value', mean_value_loss, it) # 记录价值损失18 writer.add_scalar('2:Loss/surrogate', mean_surrogate_loss, it) # 记录代理损失19 writer.add_scalar('2:Loss/learning_rate', alg.learning_rate, it) # 记录学习率20 writer.add_scalar('2:Loss/mean_kl', mean_kl, it) # 记录KL散度21 writer.add_scalar('2:Loss/mean_noise_std', mean_std.item(), it) # 记录噪声标准差2223 writer.add_scalar('3:Perf/total_fps', fps, it) # 记录FPS24 writer.add_scalar('3:Perf/collection_time', collection_time, it) # 记录数据收集时间25 writer.add_scalar('3:Perf/learning_time', learn_time, it) # 记录学习时间
控制台输出
1 print(f"{args.name}#{it}:", # 打印实验名称和迭代次数2 f"{'t'} {total_time / 60:.1f}m({iteration_time:.1f}s)", # 总时间(分钟)和迭代时间(秒)3 f"col {collection_time:.2f}s", # 数据收集时间4 f"lnt {learn_time:.2f}s", # 学习时间5 f"nm {fps:.0f}", # FPS(每秒帧数)6 f"m_kl {mean_kl:.3f}", # 平均KL散度7 f"{'v_lss:'} {mean_value_loss:.3f}", # 价值损失8 f"{'a_lss:'} {mean_surrogate_loss:.3f}", # 动作损失9 # f"l_t {mean_episode_length * gym_env.env.dt:.2f}s", # 注释掉的回合时间10 f"l_n {int(mean_episode_length)}", # 平均回合长度11 f"total_rew {mean_reward:.2f}", # 总奖励12 f"task_rew {mean_task_reward:.2f}", # 任务奖励13 sep=' ') # 分隔符
主程序入口
1if __name__ == '__main__': # 如果作为主程序运行2 train() # 调用训练函数
总结
这个文件是一个完整的强化学习训练脚本,专门用于训练四足机器人的控制策略。主要功能包括:
1. 环境设置 :配置CUDA、导入必要模块
2. 参数配置 :处理命令行参数和配置文件
3. 环境初始化 :创建仿真环境和任务
4. 网络初始化 :加载演员-评论家网络
5. 训练循环 :使用 PPO 算法进行强化学习
6. 数据记录 :使用 TensorBoard 记录训练指标
7. 模型保存 :定期保存训练好的模型
8. 性能监控 :实时显示训练进度和性能指标
整个脚本采用了现代强化学习的标准流程,支持断点续训、多环境并行训练、详细的日志记录等功能。