[docs]classMujocoPlayer:def__init__(self,model_path,mode='kinematics',input_data_freq=500,output_path=None,output_prefix=None,input_data=None,init_qpos=None):"""Initialize MuJoCo player with model and optional recorders"""ifmodenotin['kinematics','dynamics']:raiseValueError("Mode must be either 'kinematics' or 'dynamics'")self.model=mujoco.MjModel.from_xml_path(model_path)self.data=mujoco.MjData(self.model)self.mode=modeself.input_data_freq=input_data_freqself.output_path=output_pathifoutput_path:os.makedirs(output_path,exist_ok=True)self.output_prefix=output_prefixself.recorders=[]data_processor=InputDataProcessor(input_data)self.input_data=data_processor.process()self.init_qpos=init_qpos
[docs]defadd_recorder(self,recorder):"""Add a recorder to the player"""ifself.input_data_freq%recorder.output_data_freq!=0:raiseValueError("Input data frequency must be divisible by recorder output data frequency")recorder.initialize(self.output_path,self.output_prefix)self.recorders.append(recorder)
[docs]defplay_trajectory(self):"""Play trajectory and notify all recorders"""# If no data provided, initialize with zeros for ctrlifnotself.input_data:data={'ctrl':np.zeros((1000,self.model.nu))}# Default 1000 timestepselse:data=self.input_data# Calculate total frames using the first key in data dictionaryfirst_key=next(iter(data))total_frames=len(range(0,len(data[first_key])))input_time_step=int(1/(self.model.opt.timestep*self.input_data_freq))# Initialize with specified key_qpos if providedifself.init_qposisnotNone:ifself.init_qpos<0orself.init_qpos>=self.model.nkey:raiseValueError(f"init_qpos {self.init_qpos} is out of range. Model has {self.model.nkey} keyframes (0-{self.model.nkey-1})")self.data.qpos=self.model.key_qpos[self.init_qpos]# Main playback loop with progress barwithtqdm(total=total_frames,desc="Playing trajectory",unit="frame")aspbar:foriinrange(0,len(data[first_key])):forkey,valueindata.items():# Safely set attributes instead of using evalkey=key.split('.')[-1]setattr(self.data,key,value[i])# Forward the simulationifself.mode=='kinematics':mujoco.mj_fwdPosition(self.model,self.data)elifself.mode=='dynamics':for_inrange(input_time_step):mujoco.mj_step(self.model,self.data)# Notify all recordersforrecorderinself.recorders:output_time_step=int(self.input_data_freq/recorder.output_data_freq)ifi%output_time_step==0:recorder.record_frame(self.model,self.data)pbar.update(1)
[docs]defsave_data(self):"""Save data from all recorders"""# Add timestamp to output prefixforrecorderinself.recorders:recorder.save(self.output_path,self.output_prefix)