[docs]classMujocoPlayer:def__init__(self,model_path,mode='kinematics',input_data_freq=500,output_path=None,output_prefix=None):"""Initialize MuJoCo player with model and optional recorders"""ifmodenotin['kinematics','dynamics']:raiseValueError("Mode must be either 'kinematics' or 'dynamics'")self.model=mujoco.MjModel.from_xml_path(model_path)self.data=mujoco.MjData(self.model)self.mode=modeself.input_data_freq=input_data_freqself.output_path=output_pathifoutput_path:os.makedirs(output_path,exist_ok=True)self.output_prefix=output_prefixself.recorders=[]
[docs]defadd_recorder(self,recorder):"""Add a recorder to the player"""ifself.input_data_freq%recorder.output_data_freq!=0:raiseValueError("Input data frequency must be divisible by recorder output data frequency")recorder.initialize(self.output_path,self.output_prefix)self.recorders.append(recorder)
[docs]defplay_trajectory(self,data,input_data_freq):"""Play trajectory and notify all recorders"""# If no data provided, initialize with zeros for ctrlifnotdata:data={'ctrl':np.zeros((1000,self.model.nu))}# Default 100 timestepselse:forkey,valueindata.items():data[key]=np.load(value)# Calculate total frames using the first key in data dictionaryfirst_key=next(iter(data))total_frames=len(range(0,len(data[first_key])))input_time_step=int(1/(self.model.opt.timestep*input_data_freq))# Main playback loop with progress barwithtqdm(total=total_frames,desc="Playing trajectory",unit="frame")aspbar:foriinrange(0,len(data[first_key])):forkey,valueindata.items():# Safely set attributes instead of using evalsetattr(self.data,key,value[i])# Forward the simulationifself.mode=='kinematics':mujoco.mj_fwdPosition(self.model,self.data)elifself.mode=='dynamics':for_inrange(input_time_step):mujoco.mj_step(self.model,self.data)# Notify all recordersforrecorderinself.recorders:output_time_step=int(input_data_freq/recorder.output_data_freq)ifi%output_time_step==0:recorder.record_frame(self.model,self.data)pbar.update(1)
[docs]defsave_data(self):"""Save data from all recorders"""# Add timestamp to output prefixforrecorderinself.recorders:recorder.save(self.output_path,self.output_prefix)