google-deepmind / mujoco

Multi-Joint dynamics with Contact. A general purpose physics simulator.
https://mujoco.org
Apache License 2.0
7.91k stars 791 forks source link

Camera rendering in MuJoCo - How to take a picture from a simulated camera and still keeping the GUI active #1672

Closed vmstavens closed 1 month ago

vmstavens commented 4 months ago

Hello there,

I'm a research assistant and I'm trying to take pictures of my MuJoCo simulation using a simulated camera.

Currently, I have my main rendering loop running:

main render loop ```python def run(self) -> None: """ Method for running the MuJoCo viewer main loop. Returns: - None. """ with mj.viewer.launch_passive(model=self._model, data=self._data, key_callback=self.keyboard_callback) as viewer: # load initial state load_mj_state(self._model, self._data, "home", self._lock) mj.mj_step(self._model, self._data) # Toggle site frame visualization. if self._args.show_site_frames: viewer.opt.frame = mj.mjtFrame.mjFRAME_SITE while viewer.is_running(): step_start = time.time() # Step the simulation. mj.mj_step(self._model, self._data) viewer.sync() time_until_next_step = self.dt - (time.time() - step_start) if time_until_next_step > 0: time.sleep(time_until_next_step) ```

while I have my keyboard_callback function defined as

`keyboard_callback` ```python def keyboard_callback(self, key) -> None: """ Method for the keyboard callback. This method should not be called explicitly. Parameters: - key: Keyboard input. Returns: - None. """ if key == glfw.KEY_COMMA: print(self.cam.image) ```

When I call the cam.image attribute, the following function is run

get image attribute ```python @property def image(self) -> np.ndarray: """Return the captured RGB image.""" self._renderer.update_scene(self._data, camera=self.name) self._img = self._renderer.render() self._renderer.update_scene(self._data) return self._img ```

The self._renderer is here a mj.Renderer object defined by self._renderer = mj.Renderer(self._model, self._height, self._width) where self._model is the model object loaded from the scene file.

After I call the image attribute, the GUI becomes unresponsive while the main rendering loop still seems to be running just fine. Ideally, I would want to control the GUI after i have rendered a picture from the camera.

All feedback and/or ideas for how I could progress/solve this problem of mine are very much appreciated!

qgallouedec commented 3 months ago

Same issue.

vmstavens commented 3 months ago

Hi @qgallouedec

I believe I found a solution, so to help out I have written the following that you hopefully can use to solve your problem :)

  1. define the camera in the scene file
Add camera to scene ```xml ... ```
  1. Define camera class, here you can just name mine. In this class, I am using two utility functions: One for making a spatialmath SE3 homogeneous transformation matrices called make_tf and one for writing point clouds pcwrite
    Here is my camera class
class Camera:
    def __init__(self, args, model, data, cam_name: str = "", save_dir="data/img/"):
        """Initialize Camera instance.

        Args:
        - args: Arguments containing camera width and height.
        - model: Mujoco model.
        - data: Mujoco data.
        - cam_name: Name of the camera.
        - save_dir: Directory to save captured images.
        """
        self._args = args
        self._cam_name = cam_name
        self._model = model
        self._data = data
        self._save_dir = save_dir + self._cam_name + "/"

        self._width = self._args.cam_width
        self._height = self._args.cam_height
        self._cam_id = self._data.cam(self._cam_name).id

        self._renderer = mj.Renderer(self._model, self._height, self._width)
        self._camera = mj.MjvCamera()
        self._scene = mj.MjvScene(self._model, maxgeom=10_000)

        self._image = np.zeros((self._height, self._width, 3), dtype=np.uint8)
        self._depth_image = np.zeros((self._height, self._width, 1), dtype=np.float32)
        self._seg_id_image = np.zeros((self._height, self._width, 3), dtype=np.float32)
        self._point_cloud = np.zeros((self._height, self._width, 1), dtype=np.float32)

        if not os.path.exists(self._save_dir):
            os.makedirs(self._save_dir)

    @property
    def height(self) -> int:
        """
        Get the height of the camera.

        Returns:
                int: The height of the camera.
        """
        return self._height

    @property
    def width(self) -> int:
        """
        Get the width of the camera.

        Returns:
                int: The width of the camera.
        """
        return self._width

    @property
    def save_dir(self) -> str:
        """
        Get the directory where images captured by the camera are saved.

        Returns:
                str: The directory where images captured by the camera are saved.
        """
        return self._save_dir

    @property
    def name(self) -> str:
        """
        Get the name of the camera.

        Returns:
                str: The name of the camera.s
        """
        return self._cam_name

    @property
    def K(self) -> np.ndarray:
        """
        Compute the intrinsic camera matrix (K) based on the camera's field of view (fov),
        width (_width), and height (_height) parameters, following the pinhole camera model.

        Returns:
        np.ndarray: The intrinsic camera matrix (K), a 3x3 array representing the camera's intrinsic parameters.
        """
        # Convert the field of view from degrees to radians
        theta = np.deg2rad(self.fov)

        # Focal length calculation (f in terms of sensor width and height)
        f_x = (self._width / 2) / np.tan(theta / 2)
        f_y = (self._height / 2) / np.tan(theta / 2)

        # Pixel resolution (assumed to be focal length per pixel unit)
        alpha_u = f_x  # focal length in terms of pixel width
        alpha_v = f_y  # focal length in terms of pixel height

        # Optical center offsets (assuming they are at the center of the sensor)
        u_0 = (self._width - 1) / 2.0
        v_0 = (self._height - 1) / 2.0

        # Intrinsic camera matrix K
        K = np.array([[alpha_u, 0, u_0], [0, alpha_v, v_0], [0, 0, 1]])

        return K

    @property
    def T_world_cam(self) -> np.ndarray:
        """
        Compute the homogeneous transformation matrix for the camera.

        The transformation matrix is computed from the camera's position and orientation.
        The position and orientation are retrieved from the camera data.

        Returns:
        np.ndarray: The 4x4 homogeneous transformation matrix representing the camera's pose.
        """
        pos = self._data.cam(self._cam_id).xpos
        rot = self._data.cam(self._cam_id).xmat.reshape(3, 3).T
        T = make_tf(pos=pos, ori=rot).A
        return T

    @property
    def P(self) -> np.ndarray:
        """
        Compute the projection matrix for the camera.

        The projection matrix is computed as the product of the camera's intrinsic matrix (K)
        and the homogeneous transformation matrix (T_world_cam).

        Returns:
        np.ndarray: The 3x4 projection matrix.
        """
        return self.K @ self.T_world_cam

    @property
    def image(self) -> np.ndarray:
        """Return the captured RGB image."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._image = self._renderer.render()
        return self._image

    @property
    def depth_image(self) -> np.ndarray:
        """Return the captured depth image."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._renderer.enable_depth_rendering()
        self._depth_image = self._renderer.render()
        self._renderer.disable_depth_rendering()
        return self._depth_image

    @property
    def seg_image(self) -> np.ndarray:
        """Return the captured segmentation image based on object's id."""
        self._renderer.update_scene(self._data, camera=self.name)
        self._renderer.enable_segmentation_rendering()

        self._seg_id_image = self._renderer.render()[:, :, 0].reshape(
            (self.height, self.width)
        )
        self._renderer.disable_segmentation_rendering()
        return self._seg_id_image

    @property
    def point_cloud(self) -> np.ndarray:
        """Return the captured point cloud."""
        self._point_cloud = self._depth_to_point_cloud(self.depth_image)
        return self._point_cloud

    @property
    def fov(self) -> float:
        """Get the field of view (FOV) of the camera.

        Returns:
        - float: The field of view angle in degrees.
        """
        return self._model.cam(self._cam_id).fovy[0]

    @property
    def id(self) -> int:
        """Get the identifier of the camera.

        Returns:
        - int: The identifier of the camera.
        """
        return self._cam_id

    def _depth_to_point_cloud(self, depth_image: np.ndarray) -> np.ndarray:
        """
        Method to convert depth image to a point cloud in camera coordinates.

        Args:
        - depth_image: The depth image we want to convert to a point cloud.

        Returns:
        - np.ndarray: 3D points in camera coordinates.
        """
        # Get image dimensions
        dimg_shape = depth_image.shape
        height = dimg_shape[0]
        width = dimg_shape[1]

        # Create pixel grid
        y, x = np.meshgrid(np.arange(height), np.arange(width), indexing="ij")

        # Flatten arrays for vectorized computation
        x_flat = x.flatten()
        y_flat = y.flatten()
        depth_flat = depth_image.flatten()

        # Negate depth values because z-axis goes into the camera
        depth_flat = -depth_flat

        # Stack flattened arrays to form homogeneous coordinates
        homogeneous_coords = np.vstack((x_flat, y_flat, np.ones_like(x_flat)))

        # Compute inverse of the intrinsic matrix K
        K_inv = np.linalg.inv(self.K)

        # Calculate 3D points in camera coordinates
        points_camera = np.dot(K_inv, homogeneous_coords) * depth_flat

        # Homogeneous coordinates to 3D points
        points_camera = np.vstack((points_camera, np.ones_like(x_flat)))

        points_camera = points_camera.T

        # dehomogenize
        points_camera = points_camera[:, :3] / points_camera[:, 3][:, np.newaxis]

        return points_camera

    def shoot(self, autosave: bool = True) -> None:
        """
        Captures a new rgb image, depth image and point cloud from the camera.
        Args:
        - autosave: If the camera rgb image, depth image and point cloud should be saved.

        Returns:
        - None.
        """
        self._image = self.image
        self._depth_image = self.depth_image
        self._point_cloud = self.point_cloud
        self._seg_image = self.seg_image
        if autosave:
            self.save()

    def save(self, img_name: str = "") -> None:
        """Saves the captured image and depth information.

        Args:
        - img_name: Name for the saved image file.
        """
        print(f"saving rgb image, depth image and point cloud to {self.save_dir}")

        if img_name == "":
            timestamp = datetime.now()
            cv2.imwrite(
                self._save_dir + f"{timestamp}_rgb.png",
                cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
            )
            cv2.imwrite(self._save_dir + f"{timestamp}_seg.png", self.seg_image)
            np.save(self._save_dir + f"{timestamp}_depth.npy", self.depth_image)
            pcwrite(self._save_dir + f"{timestamp}.pcd", self.point_cloud)

        else:
            cv2.imwrite(
                self._save_dir + f"{img_name}_rgb.png",
                cv2.cvtColor(self.image, cv2.COLOR_RGB2BGR),
            )
            cv2.imwrite(self._save_dir + f"{img_name}_seg.png", self.seg_image)
            np.save(self._save_dir + f"{img_name}_depth.npy", self.depth_image)
            pcwrite(self._save_dir + f"{img_name}.pcd", self.point_cloud)

Here are my utility functions ```python from typing import List, Union import numpy as np import pandas as pd import spatialmath as sm import spatialmath.base as smb def make_tf( pos: Union[np.ndarray, list] = [0, 0, 0], ori: Union[np.ndarray, sm.SE3, sm.SO3] = [1, 0, 0, 0], ) -> sm.SE3: """ Create a SE3 transformation matrix. Parameters: - pos (np.ndarray): Translation vector [x, y, z]. - ori (Union[np.ndarray, SE3]): Orientation, can be a rotation matrix, quaternion, or SE3 object. Returns: - SE3: Transformation matrix. """ if isinstance(ori, list): ori = np.array(ori) if isinstance(ori, sm.SO3): ori = ori.R if isinstance(pos, sm.SE3): pose = pos pos = pose.t ori = pose.R if len(ori) == 9: ori = np.reshape(ori, (3, 3)) # Convert ori to SE3 if it's already a rotation matrix or a quaternion if isinstance(ori, np.ndarray): if ori.shape == (3, 3): # Assuming ori is a rotation matrix ori = ori elif ori.shape == (4,): # Assuming ori is a quaternion ori = sm.UnitQuaternion(s=ori[0], v=ori[1:]).R elif ori.shape == (3,): # Assuming ori is rpy ori = sm.SE3.Eul(ori, unit="rad").R T_R = smb.r2t(ori) if is_R_valid(ori) else smb.r2t(make_R_valid(ori)) R = sm.SE3(T_R, check=False).R # Combine translation and orientation T = sm.SE3.Rt(R=R, t=pos, check=False) return T def is_R_valid(R: np.ndarray, tol: float = 1e-8) -> bool: """ Checks if the input matrix R is a valid 3x3 rotation matrix. Parameters: R (np.ndarray): The input matrix to be checked. tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-8. Returns: bool: True if R is a valid rotation matrix, False otherwise. Raises: ValueError: If R is not a 3x3 matrix. """ # Check if R is a 3x3 matrix if not isinstance(R, np.ndarray) or R.shape != (3, 3): raise ValueError(f"Input is not a 3x3 matrix. R is \n{R}") # Check if R is orthogonal is_orthogonal = np.allclose(np.dot(R.T, R), np.eye(3), atol=tol) # Check if the determinant is 1 det = np.linalg.det(R) return is_orthogonal and np.isclose(det, 1.0, atol=tol) def make_R_valid(R: np.ndarray, tol: float = 1e-6) -> np.ndarray: """ Makes the input matrix R a valid 3x3 rotation matrix. Parameters: R (np.ndarray): The input matrix to be made valid. tol (float, optional): Tolerance for numerical comparison. Defaults to 1e-6. Returns: np.ndarray: A valid 3x3 rotation matrix derived from the input matrix R. Raises: ValueError: If the input is not a 3x3 matrix or if it cannot be made a valid rotation matrix. """ if not isinstance(R, np.ndarray): R = np.array(R) # Check if R is a 3x3 matrix if R.shape != (3, 3): raise ValueError("Input is not a 3x3 matrix") # Step 1: Gram-Schmidt Orthogonalization Q, _ = np.linalg.qr(R) # Step 2: Ensure determinant is 1 det = np.linalg.det(Q) if np.isclose(det, 0.0, atol=tol): raise ValueError("Invalid rotation matrix (determinant is zero)") # Step 3: Ensure determinant is positive if det < 0: Q[:, 2] *= -1 return Q ```
  1. Finally what you need is to call the renderer from the main loop. Unfortunately, this means you cannot call it from your keyboard callback. Instead what I do is to append the event to a queue which is then popped whenever the main loop ticks again. I tend to package my simulations into classes so here is a rough sketch of what code you might need.
Here is my run ```python import queue import mujoco as mj import mujoco.viewer class MySimulation: ... self.cam = Camera(self._args, self._model, self._data, "cam") ... def keyboard_callback(self, key) -> None: """ Method for the keyboard callback. This method should not be called explicitly. Parameters: - key: Keyboard input. Returns: - None. """ if key == glfw.KEY_SPACE: pass elif key == glfw.KEY_PERIOD: print("Shoot! I just took a picture") self.cam.shoot() def run(self) -> None: """ Method for running the MuJoCo viewer main loop. Returns: - None. """ # in order to enable camera rendering in main thread, queue the key events key_queue = queue.Queue() with mj.viewer.launch_passive( model=self._model, data=self._data, key_callback=lambda key: key_queue.put(key), ) as viewer: self.cam._renderer.render() while viewer.is_running(): step_start = time.time() while not key_queue.empty(): self.keyboard_callback(key_queue.get()) # step the simulation. mj.mj_step(self._model, self._data) viewer.sync() time_until_next_step = self.dt - (time.time() - step_start) if time_until_next_step > 0: time.sleep(time_until_next_step) ```

I hope this can assist you in solving your problem :)

qgallouedec commented 3 months ago

Thank you!