Source code for mujoco_tools.recorder

import mujoco
import mujoco.viewer
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
from pathlib import Path
import imageio
from tqdm import tqdm

[docs]class VideoRecorder: def __init__(self, camera_name='lateral_camera_angle', width=1920, height=1080, fps=50, vision_flags=None, output_video_freq=50, activation_map=False, activation_shape=(35, 20)): """Initialize video recorder with rendering settings Args: camera_name: Camera name or space-separated camera names width: Video width height: Video height fps: Frames per second vision_flags: MuJoCo vision flags output_video_freq: Output video frequency activation_map: Whether to include activation visualization in the recording activation_shape: Shape to use for activation visualization (rows, cols) """ # Convert camera_name string to list if space-separated self.camera_names = camera_name.split() if isinstance(camera_name, str) else camera_name self.fps = fps self.output_data_freq = output_video_freq # Width per camera will be total width divided by number of cameras self.camera_width = width self.camera_height = height self.vision_flags = vision_flags self.setup_renderer(self.camera_width, self.camera_height) # Initialize video writer as None self.video_writer = None self.output_path = None # Activation map settings self.activation_map = activation_map self.activation_shape = activation_shape
[docs] def setup_renderer(self, camera_width, camera_height): """Set up the MuJoCo renderer with given settings""" self.video_height = camera_height self.video_width = camera_width * len(self.camera_names) self.rgb_renderer = None self.scene_option = mujoco.MjvOption() for flag, value in self.vision_flags.items(): self.scene_option.flags[getattr(mujoco.mjtVisFlag, flag)] = value
[docs] def initialize(self, output_path, output_prefix): """Initialize video writer""" self.output_path = f'{output_path}/{output_prefix}_video.mp4' frame_size = (self.video_width, self.video_height) fourcc = cv2.VideoWriter_fourcc(*'mp4v') self.video_writer = cv2.VideoWriter(self.output_path, fourcc, self.fps, frame_size) print(f"Started recording to {self.output_path}")
[docs] def record_frame(self, model, data): """Record frames from multiple cameras and concatenate them Args: model: MuJoCo model data: MuJoCo data """ frames = [] if self.rgb_renderer is None: self.rgb_renderer = mujoco.Renderer(model, width=self.camera_width, height=self.camera_height) for camera_name in self.camera_names: # Create scene and camera self.rgb_renderer.update_scene(data, camera=camera_name, scene_option=self.scene_option) frame = self.rgb_renderer.render() frames.append(frame) # Concatenate frames horizontally combined_frame = np.concatenate(frames, axis=1) # Add activation map if enabled if self.activation_map: combined_frame = self._add_activation_map(combined_frame, data.act) # Convert from RGB to BGR for OpenCV combined_frame = cv2.cvtColor(combined_frame, cv2.COLOR_RGB2BGR) self.video_writer.write(combined_frame)
def _add_activation_map(self, sim_frame, activation): """Add activation map visualization to the simulation frame Args: sim_frame: Simulation frame (numpy array) activation: Activation array to visualize Returns: numpy.ndarray: Combined frame with activation visualization """ # Make a copy of the activation array activation = activation.copy() # Calculate the total elements in the target shape target_elements = self.activation_shape[0] * self.activation_shape[1] # If the activation array is smaller than the target shape, pad with zeros if activation.size < target_elements: # Create a new array with the right number of elements, filled with zeros padded_activation = np.zeros(target_elements) # Copy the actual activation values to the beginning of the padded array padded_activation[:activation.size] = activation.flatten() # Use the padded activation for visualization activation = padded_activation # Generate activation visualization activation_frame = self.visualize_activation(activation, shape=self.activation_shape) # Resize activation frame to match the height of the original frame # while preserving aspect ratio act_height = sim_frame.shape[0] act_aspect_ratio = activation_frame.shape[1] / activation_frame.shape[0] act_width = int(act_height * act_aspect_ratio) # Force act_width to be even act_width = act_width if act_width % 2 == 0 else act_width + 1 activation_frame_resized = cv2.resize(activation_frame, (act_width, act_height)) # Calculate how much space we have for the simulation frame total_width = self.video_width sim_width = total_width - act_width # If the simulation frame is wider than the available space, crop it from the center if sim_frame.shape[1] > sim_width: # Crop from the center center_x = sim_frame.shape[1] // 2 half_width = sim_width // 2 left_bound = max(0, center_x - half_width) right_bound = min(sim_frame.shape[1], center_x + half_width) # Extract the center region sim_frame = sim_frame[:, left_bound:right_bound] # Concatenate with the activation visualization combined_frame = np.concatenate([sim_frame, activation_frame_resized], axis=1) return combined_frame
[docs] def save(self, output_path, output_prefix='video'): """Finish recording and release video writer""" if self.video_writer is not None: self.video_writer.release() self.video_writer = None print(f"Video saved successfully to {self.output_path}")
[docs] def visualize_activation(self, activation, shape=(35, 20), title='Muscle Activation', figsize=(20, 35), dpi=30.86): """Convert muscle activation array into a visualization image Args: activation: Numpy array of activation values (will be reshaped to shape) shape: Tuple (rows, cols) to reshape the activation array title: Title for the visualization figsize: Figure size in inches dpi: Dots per inch for the figure Returns: numpy.ndarray: RGB image of the visualization """ import matplotlib.pyplot as plt plt.figure(layout='constrained', facecolor='white', figsize=figsize, dpi=dpi) # Define the color gradient: light pink to muscle red actuator_color = np.array([255, 227, 224]) / 255.0 # Light pink muscle_color = np.array([239, 99, 81]) / 255.0 # Muscle red cmap = plt.cm.colors.LinearSegmentedColormap.from_list("custom_cmap", [actuator_color, muscle_color]) # Set up the plot plt.imshow(activation.reshape(shape), cmap=cmap, vmax=1, vmin=0) # Remove ticks and labels plt.gca().tick_params(axis='both', which='both', bottom=False, top=False, labelbottom=False, right=False, left=False, labelleft=False) # Add grid plt.grid(True, which='both', axis='both', color='black', linestyle='-', linewidth=4) # Set grid lines to be in between cells plt.gca().set_xticks(np.arange(-0.5, shape[1] + 0.5, 1) - 0.005) plt.gca().set_yticks(np.arange(-0.5, shape[0] + 0.5, 1) - 0.01) plt.gca().set_axisbelow(False) plt.title(title, fontsize=96, fontweight='bold') # Render figure to numpy array fig = plt.gcf() fig.canvas.draw() img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8) img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,)) plt.close() return img
[docs]class StateRecorder: def __init__(self, model, output_format='npy', datatypes=None, output_data_freq=50): """Initialize state recorder for positions and orientations""" self.datatypes = datatypes self.output_format = output_format self.output_data_freq = output_data_freq self.reset()
[docs] def initialize(self, output_path, output_prefix): """Initialize state recorder""" self.output_path = output_path self.output_prefix = output_prefix
[docs] def reset(self): """Reset recorded data""" self.recorded_data = {datatype: [] for datatype in self.datatypes}
[docs] def record_frame(self, model, data): """Record position and orientation data for the current frame""" for datatype in self.datatypes: self.recorded_data[datatype].append(getattr(data, datatype).copy())
[docs] def tendon_waypoint(self, model, data): """ Record tendon waypoint xpos ten_wrapadr is the start address of tendon's path ten_wrapnum is the number of wrap points in path wrap_xpos is the Cartesian 3D points in all paths return: waypoint_xpos: list of numpy arrays, (ntendon, num_waypoints, 6) """ ten_wrapadr = data.ten_wrapadr ten_wrapnum = data.ten_wrapnum wrap_xpos = data.wrap_xpos.reshape(-1,3) waypoint_xpos = [] for i in range(model.ntendon): start = ten_wrapadr[i] end = start + ten_wrapnum[i] waypoint_xpos.append(wrap_xpos[start:end,:]) return waypoint_xpos
[docs] def save(self, output_path, output_prefix='state'): """Save recorded state data""" for datatype in self.datatypes: np.save(f'{self.output_path}/{self.output_prefix}_{datatype}.npy', np.array(self.recorded_data[datatype]))