Skip to content

Cameras

Abstract interface

Abstract camera interface for recording and conversion

Camera

Bases: ABC

Abstract base class for cameras.

Supports two usage modes: - Live recording: open() → start_recording(path) → grab() loop → stop_recording() → close() - File playback: open_file(path) → grab() loop → get_frame() → close()

Source code in raiden/cameras/base.py
class Camera(ABC):
    """Abstract base class for cameras.

    Supports two usage modes:
    - Live recording: open() → start_recording(path) → grab() loop → stop_recording() → close()
    - File playback: open_file(path) → grab() loop → get_frame() → close()
    """

    @property
    @abstractmethod
    def name(self) -> str:
        """Semantic name assigned to this camera (e.g. 'scene_camera')"""
        ...

    @property
    @abstractmethod
    def serial_number(self) -> str:
        """Camera hardware serial number as string"""
        ...

    @property
    @abstractmethod
    def recording_extension(self) -> str:
        """File extension used for native recording (e.g. 'svo2', 'bag')"""
        ...

    @abstractmethod
    def open(self) -> None:
        """Open live camera connection"""
        ...

    @abstractmethod
    def close(self) -> None:
        """Close camera and release all resources"""
        ...

    @abstractmethod
    def start_recording(self, path: Path) -> None:
        """Begin recording to file.

        For ZED this enables SVO2 recording; every subsequent grab() is stored.
        For RealSense this starts a .bag recorder.
        """
        ...

    @abstractmethod
    def stop_recording(self) -> None:
        """Stop recording and finalize the output file"""
        ...

    @abstractmethod
    def grab(self) -> bool:
        """Grab the next available frame.

        Blocks until a frame is ready (rate-limited to camera FPS).
        While recording is active, the frame is also written to the output file.

        Returns:
            True on success, False on error or end-of-file
        """
        ...

    @abstractmethod
    def get_frame(self) -> CameraFrame:
        """Retrieve color and depth from the most recently grabbed frame.

        Only valid to call after a successful grab().
        """
        ...

    def get_current_timestamp_ns(self) -> int:
        """Current host time in nanoseconds.

        The default implementation uses ``time.time_ns()``.  Camera subclasses
        that have their own hardware clock (e.g. ZED) should override this so
        that the returned value is on the *same clock* as ``CameraFrame.timestamp_ns``,
        enabling direct interpolation between robot data and camera frames.
        """
        import time

        return time.time_ns()

    @abstractmethod
    def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
        """Return factory-calibrated camera intrinsics.

        These come directly from the camera SDK (ZED factory calibration,
        RealSense on-device calibration) rather than being computed from images.

        Returns:
            camera_matrix: 3x3 float64 array ``[[fx,0,cx],[0,fy,cy],[0,0,1]]``
            dist_coeffs: 1-D float64 array ``[k1, k2, p1, p2, k3]`` (OpenCV Brown-Conrady)
            image_size: ``(width, height)`` in pixels
        """
        ...

name abstractmethod property

Semantic name assigned to this camera (e.g. 'scene_camera')

recording_extension abstractmethod property

File extension used for native recording (e.g. 'svo2', 'bag')

serial_number abstractmethod property

Camera hardware serial number as string

close() abstractmethod

Close camera and release all resources

Source code in raiden/cameras/base.py
@abstractmethod
def close(self) -> None:
    """Close camera and release all resources"""
    ...

get_current_timestamp_ns()

Current host time in nanoseconds.

The default implementation uses time.time_ns(). Camera subclasses that have their own hardware clock (e.g. ZED) should override this so that the returned value is on the same clock as CameraFrame.timestamp_ns, enabling direct interpolation between robot data and camera frames.

Source code in raiden/cameras/base.py
def get_current_timestamp_ns(self) -> int:
    """Current host time in nanoseconds.

    The default implementation uses ``time.time_ns()``.  Camera subclasses
    that have their own hardware clock (e.g. ZED) should override this so
    that the returned value is on the *same clock* as ``CameraFrame.timestamp_ns``,
    enabling direct interpolation between robot data and camera frames.
    """
    import time

    return time.time_ns()

get_frame() abstractmethod

Retrieve color and depth from the most recently grabbed frame.

Only valid to call after a successful grab().

Source code in raiden/cameras/base.py
@abstractmethod
def get_frame(self) -> CameraFrame:
    """Retrieve color and depth from the most recently grabbed frame.

    Only valid to call after a successful grab().
    """
    ...

get_intrinsics() abstractmethod

Return factory-calibrated camera intrinsics.

These come directly from the camera SDK (ZED factory calibration, RealSense on-device calibration) rather than being computed from images.

Returns:

Name Type Description
camera_matrix ndarray

3x3 float64 array [[fx,0,cx],[0,fy,cy],[0,0,1]]

dist_coeffs ndarray

1-D float64 array [k1, k2, p1, p2, k3] (OpenCV Brown-Conrady)

image_size Tuple[int, int]

(width, height) in pixels

Source code in raiden/cameras/base.py
@abstractmethod
def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
    """Return factory-calibrated camera intrinsics.

    These come directly from the camera SDK (ZED factory calibration,
    RealSense on-device calibration) rather than being computed from images.

    Returns:
        camera_matrix: 3x3 float64 array ``[[fx,0,cx],[0,fy,cy],[0,0,1]]``
        dist_coeffs: 1-D float64 array ``[k1, k2, p1, p2, k3]`` (OpenCV Brown-Conrady)
        image_size: ``(width, height)`` in pixels
    """
    ...

grab() abstractmethod

Grab the next available frame.

Blocks until a frame is ready (rate-limited to camera FPS). While recording is active, the frame is also written to the output file.

Returns:

Type Description
bool

True on success, False on error or end-of-file

Source code in raiden/cameras/base.py
@abstractmethod
def grab(self) -> bool:
    """Grab the next available frame.

    Blocks until a frame is ready (rate-limited to camera FPS).
    While recording is active, the frame is also written to the output file.

    Returns:
        True on success, False on error or end-of-file
    """
    ...

open() abstractmethod

Open live camera connection

Source code in raiden/cameras/base.py
@abstractmethod
def open(self) -> None:
    """Open live camera connection"""
    ...

start_recording(path) abstractmethod

Begin recording to file.

For ZED this enables SVO2 recording; every subsequent grab() is stored. For RealSense this starts a .bag recorder.

Source code in raiden/cameras/base.py
@abstractmethod
def start_recording(self, path: Path) -> None:
    """Begin recording to file.

    For ZED this enables SVO2 recording; every subsequent grab() is stored.
    For RealSense this starts a .bag recorder.
    """
    ...

stop_recording() abstractmethod

Stop recording and finalize the output file

Source code in raiden/cameras/base.py
@abstractmethod
def stop_recording(self) -> None:
    """Stop recording and finalize the output file"""
    ...

CameraFrame dataclass

A single camera frame

Source code in raiden/cameras/base.py
@dataclass
class CameraFrame:
    """A single camera frame"""

    color: np.ndarray  # HxWx3 BGR uint8
    depth: Optional[np.ndarray]  # HxW float32 meters, None if unavailable
    timestamp_ns: int  # nanosecond timestamp (camera clock)

ZED camera

ZED stereo camera implementation using the pyzed SDK.

opens by serial number, records to SVO2 (H264).

Depth is NOT computed during recording (depth_mode=NONE) for lower CPU usage; the raw stereo pair is stored instead.

Playback mode : opens an SVO2 file, computes depth (NEURAL) for each frame. Use ZedCamera.from_svo() to create a playback instance.

ZedCamera

Bases: Camera

ZED camera – records to .svo2

Source code in raiden/cameras/zed.py
class ZedCamera(Camera):
    """ZED camera – records to .svo2"""

    def __init__(self, camera_name: str, serial_number: int, fps: int = 30):
        self._name = camera_name
        self._serial = serial_number
        self._fps = fps
        self._camera = sl.Camera()
        self._is_open = False
        self._image = sl.Mat()
        self._depth = sl.Mat()
        self._right_image = sl.Mat()
        self._has_depth = False  # True only in playback mode

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def name(self) -> str:
        return self._name

    @property
    def serial_number(self) -> str:
        return str(self._serial)

    @property
    def recording_extension(self) -> str:
        return "svo2"

    # ------------------------------------------------------------------
    # Live recording
    # ------------------------------------------------------------------

    def open(self) -> None:
        """Open live camera. Depth is disabled to reduce recording overhead."""
        init_params = sl.InitParameters()
        init_params.set_from_serial_number(self._serial)
        init_params.camera_resolution = sl.RESOLUTION.HD720
        init_params.camera_fps = self._fps
        init_params.depth_mode = sl.DEPTH_MODE.NONE
        init_params.coordinate_units = sl.UNIT.METER

        status = self._camera.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS:
            raise RuntimeError(
                f"Failed to open ZED camera '{self._name}' "
                f"(serial: {self._serial}): {status}"
            )
        self._is_open = True
        self._has_depth = False

    def close(self) -> None:
        if self._is_open:
            self._camera.close()
            self._is_open = False

    def start_recording(self, path: Path) -> None:
        """Enable SVO2 recording. Every subsequent grab() writes a frame."""
        params = sl.RecordingParameters()
        params.compression_mode = sl.SVO_COMPRESSION_MODE.H264
        params.video_filename = str(path)
        status = self._camera.enable_recording(params)
        if status != sl.ERROR_CODE.SUCCESS:
            raise RuntimeError(
                f"Failed to start SVO2 recording for '{self._name}': {status}"
            )

    def stop_recording(self) -> None:
        self._camera.disable_recording()

    def grab(self) -> bool:
        """Grab next frame (blocks until ready at camera FPS)."""
        runtime_params = sl.RuntimeParameters()
        status = self._camera.grab(runtime_params)
        return status == sl.ERROR_CODE.SUCCESS

    def get_frame(self) -> CameraFrame:
        """Retrieve color (and depth if in playback mode) from last grab."""
        self._camera.retrieve_image(self._image, sl.VIEW.LEFT)
        color = self._image.get_data()[:, :, :3].copy()  # drop alpha

        depth: Optional[np.ndarray] = None
        if self._has_depth:
            self._camera.retrieve_measure(self._depth, sl.MEASURE.DEPTH)
            raw = self._depth.get_data().copy()
            depth = np.where(np.isfinite(raw), raw, 0.0).astype(np.float32)

        timestamp_ns = self._camera.get_timestamp(
            sl.TIME_REFERENCE.IMAGE
        ).get_nanoseconds()

        return CameraFrame(color=color, depth=depth, timestamp_ns=timestamp_ns)

    # ------------------------------------------------------------------
    # Playback from SVO2 file
    # ------------------------------------------------------------------

    @classmethod
    def from_svo(
        cls,
        camera_name: str,
        svo_path: Path,
        compute_sdk_depth: bool = True,
    ) -> "ZedCamera":
        """Open an SVO2 file for frame-by-frame extraction.

        Parameters
        ----------
        compute_sdk_depth : bool
            If True (default) depth is computed by the ZED SDK (NEURAL_LIGHT).
            Set to False when you will compute depth externally (e.g. with FFS);
            this skips the SDK depth pass for lower CPU/GPU load.
        """
        cam = cls.__new__(cls)
        cam._name = camera_name
        cam._serial = 0
        cam._fps = 30
        cam._camera = sl.Camera()
        cam._is_open = False
        cam._image = sl.Mat()
        cam._depth = sl.Mat()
        cam._right_image = sl.Mat()
        cam._has_depth = compute_sdk_depth

        init_params = sl.InitParameters()
        init_params.set_from_svo_file(str(svo_path))
        init_params.svo_real_time_mode = False  # process all frames
        init_params.depth_mode = (
            sl.DEPTH_MODE.NEURAL_LIGHT if compute_sdk_depth else sl.DEPTH_MODE.NONE
        )
        init_params.coordinate_units = sl.UNIT.METER
        init_params.depth_minimum_distance = 0.01

        status = cam._camera.open(init_params)
        if status != sl.ERROR_CODE.SUCCESS:
            raise RuntimeError(f"Failed to open SVO2 '{svo_path}': {status}")

        cam._is_open = True
        return cam

    def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
        """Return ZED SDK factory-calibrated intrinsics for the left lens."""
        info = self._camera.get_camera_information()
        cal = info.camera_configuration.calibration_parameters.left_cam
        res = info.camera_configuration.resolution

        camera_matrix = np.array(
            [[cal.fx, 0.0, cal.cx], [0.0, cal.fy, cal.cy], [0.0, 0.0, 1.0]],
            dtype=np.float64,
        )
        # ZED disto order: k1, k2, p1, p2, k3
        dist_coeffs = np.array(
            [cal.disto[0], cal.disto[1], cal.disto[2], cal.disto[3], cal.disto[4]],
            dtype=np.float64,
        )
        image_size = (res.width, res.height)
        return camera_matrix, dist_coeffs, image_size

    def get_right_color(self) -> np.ndarray:
        """Return the right-camera BGR image for the most recently grabbed frame.

        Only valid after a successful grab(). Used by external stereo depth
        predictors (e.g. Fast Foundation Stereo) that need both views.
        """
        self._camera.retrieve_image(self._right_image, sl.VIEW.RIGHT)
        return self._right_image.get_data()[:, :, :3].copy()

    def get_stereo_calib(self) -> "tuple[float, float]":
        """Return ``(fx, baseline_m)`` for the left camera at the current resolution.

        ``fx`` is the left-camera focal length in pixels and ``baseline_m`` is
        the stereo baseline in metres. Used to convert FFS disparity to depth.
        """
        info = self._camera.get_camera_information()
        cal = info.camera_configuration.calibration_parameters
        fx = float(cal.left_cam.fx)
        baseline = float(abs(cal.get_camera_baseline()))
        return fx, baseline

    def get_total_frames(self) -> int:
        """Total frame count (valid for SVO2 playback only)."""
        return self._camera.get_svo_number_of_frames()

    def get_frame_timestamp_ns(self) -> int:
        """Timestamp of the most recently grabbed frame in nanoseconds (ZED hardware clock)."""
        return self._camera.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()

    def get_current_timestamp_ns(self) -> int:
        """Current host time in nanoseconds via the ZED SDK.

        On the same clock as ``get_frame_timestamp_ns()``, so robot observations
        recorded with this value can be directly interpolated against camera
        frame timestamps.
        """
        return self._camera.get_timestamp(sl.TIME_REFERENCE.CURRENT).get_nanoseconds()

    def get_camera_info(self) -> dict:
        """Return camera intrinsics and metadata as a plain dict."""
        info = self._camera.get_camera_information()
        cal = info.camera_configuration.calibration_parameters.left_cam
        res = info.camera_configuration.resolution
        return {
            "serial_number": str(self._serial) or "unknown",
            "model": str(info.camera_model),
            "fps": self._fps,
            "width": res.width,
            "height": res.height,
            "fx": cal.fx,
            "fy": cal.fy,
            "cx": cal.cx,
            "cy": cal.cy,
            "k1": cal.disto[0],
            "k2": cal.disto[1],
            "p1": cal.disto[2],
            "p2": cal.disto[3],
            "k3": cal.disto[4],
        }

from_svo(camera_name, svo_path, compute_sdk_depth=True) classmethod

Open an SVO2 file for frame-by-frame extraction.

Parameters

compute_sdk_depth : bool If True (default) depth is computed by the ZED SDK (NEURAL_LIGHT). Set to False when you will compute depth externally (e.g. with FFS); this skips the SDK depth pass for lower CPU/GPU load.

Source code in raiden/cameras/zed.py
@classmethod
def from_svo(
    cls,
    camera_name: str,
    svo_path: Path,
    compute_sdk_depth: bool = True,
) -> "ZedCamera":
    """Open an SVO2 file for frame-by-frame extraction.

    Parameters
    ----------
    compute_sdk_depth : bool
        If True (default) depth is computed by the ZED SDK (NEURAL_LIGHT).
        Set to False when you will compute depth externally (e.g. with FFS);
        this skips the SDK depth pass for lower CPU/GPU load.
    """
    cam = cls.__new__(cls)
    cam._name = camera_name
    cam._serial = 0
    cam._fps = 30
    cam._camera = sl.Camera()
    cam._is_open = False
    cam._image = sl.Mat()
    cam._depth = sl.Mat()
    cam._right_image = sl.Mat()
    cam._has_depth = compute_sdk_depth

    init_params = sl.InitParameters()
    init_params.set_from_svo_file(str(svo_path))
    init_params.svo_real_time_mode = False  # process all frames
    init_params.depth_mode = (
        sl.DEPTH_MODE.NEURAL_LIGHT if compute_sdk_depth else sl.DEPTH_MODE.NONE
    )
    init_params.coordinate_units = sl.UNIT.METER
    init_params.depth_minimum_distance = 0.01

    status = cam._camera.open(init_params)
    if status != sl.ERROR_CODE.SUCCESS:
        raise RuntimeError(f"Failed to open SVO2 '{svo_path}': {status}")

    cam._is_open = True
    return cam

get_camera_info()

Return camera intrinsics and metadata as a plain dict.

Source code in raiden/cameras/zed.py
def get_camera_info(self) -> dict:
    """Return camera intrinsics and metadata as a plain dict."""
    info = self._camera.get_camera_information()
    cal = info.camera_configuration.calibration_parameters.left_cam
    res = info.camera_configuration.resolution
    return {
        "serial_number": str(self._serial) or "unknown",
        "model": str(info.camera_model),
        "fps": self._fps,
        "width": res.width,
        "height": res.height,
        "fx": cal.fx,
        "fy": cal.fy,
        "cx": cal.cx,
        "cy": cal.cy,
        "k1": cal.disto[0],
        "k2": cal.disto[1],
        "p1": cal.disto[2],
        "p2": cal.disto[3],
        "k3": cal.disto[4],
    }

get_current_timestamp_ns()

Current host time in nanoseconds via the ZED SDK.

On the same clock as get_frame_timestamp_ns(), so robot observations recorded with this value can be directly interpolated against camera frame timestamps.

Source code in raiden/cameras/zed.py
def get_current_timestamp_ns(self) -> int:
    """Current host time in nanoseconds via the ZED SDK.

    On the same clock as ``get_frame_timestamp_ns()``, so robot observations
    recorded with this value can be directly interpolated against camera
    frame timestamps.
    """
    return self._camera.get_timestamp(sl.TIME_REFERENCE.CURRENT).get_nanoseconds()

get_frame()

Retrieve color (and depth if in playback mode) from last grab.

Source code in raiden/cameras/zed.py
def get_frame(self) -> CameraFrame:
    """Retrieve color (and depth if in playback mode) from last grab."""
    self._camera.retrieve_image(self._image, sl.VIEW.LEFT)
    color = self._image.get_data()[:, :, :3].copy()  # drop alpha

    depth: Optional[np.ndarray] = None
    if self._has_depth:
        self._camera.retrieve_measure(self._depth, sl.MEASURE.DEPTH)
        raw = self._depth.get_data().copy()
        depth = np.where(np.isfinite(raw), raw, 0.0).astype(np.float32)

    timestamp_ns = self._camera.get_timestamp(
        sl.TIME_REFERENCE.IMAGE
    ).get_nanoseconds()

    return CameraFrame(color=color, depth=depth, timestamp_ns=timestamp_ns)

get_frame_timestamp_ns()

Timestamp of the most recently grabbed frame in nanoseconds (ZED hardware clock).

Source code in raiden/cameras/zed.py
def get_frame_timestamp_ns(self) -> int:
    """Timestamp of the most recently grabbed frame in nanoseconds (ZED hardware clock)."""
    return self._camera.get_timestamp(sl.TIME_REFERENCE.IMAGE).get_nanoseconds()

get_intrinsics()

Return ZED SDK factory-calibrated intrinsics for the left lens.

Source code in raiden/cameras/zed.py
def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
    """Return ZED SDK factory-calibrated intrinsics for the left lens."""
    info = self._camera.get_camera_information()
    cal = info.camera_configuration.calibration_parameters.left_cam
    res = info.camera_configuration.resolution

    camera_matrix = np.array(
        [[cal.fx, 0.0, cal.cx], [0.0, cal.fy, cal.cy], [0.0, 0.0, 1.0]],
        dtype=np.float64,
    )
    # ZED disto order: k1, k2, p1, p2, k3
    dist_coeffs = np.array(
        [cal.disto[0], cal.disto[1], cal.disto[2], cal.disto[3], cal.disto[4]],
        dtype=np.float64,
    )
    image_size = (res.width, res.height)
    return camera_matrix, dist_coeffs, image_size

get_right_color()

Return the right-camera BGR image for the most recently grabbed frame.

Only valid after a successful grab(). Used by external stereo depth predictors (e.g. Fast Foundation Stereo) that need both views.

Source code in raiden/cameras/zed.py
def get_right_color(self) -> np.ndarray:
    """Return the right-camera BGR image for the most recently grabbed frame.

    Only valid after a successful grab(). Used by external stereo depth
    predictors (e.g. Fast Foundation Stereo) that need both views.
    """
    self._camera.retrieve_image(self._right_image, sl.VIEW.RIGHT)
    return self._right_image.get_data()[:, :, :3].copy()

get_stereo_calib()

Return (fx, baseline_m) for the left camera at the current resolution.

fx is the left-camera focal length in pixels and baseline_m is the stereo baseline in metres. Used to convert FFS disparity to depth.

Source code in raiden/cameras/zed.py
def get_stereo_calib(self) -> "tuple[float, float]":
    """Return ``(fx, baseline_m)`` for the left camera at the current resolution.

    ``fx`` is the left-camera focal length in pixels and ``baseline_m`` is
    the stereo baseline in metres. Used to convert FFS disparity to depth.
    """
    info = self._camera.get_camera_information()
    cal = info.camera_configuration.calibration_parameters
    fx = float(cal.left_cam.fx)
    baseline = float(abs(cal.get_camera_baseline()))
    return fx, baseline

get_total_frames()

Total frame count (valid for SVO2 playback only).

Source code in raiden/cameras/zed.py
def get_total_frames(self) -> int:
    """Total frame count (valid for SVO2 playback only)."""
    return self._camera.get_svo_number_of_frames()

grab()

Grab next frame (blocks until ready at camera FPS).

Source code in raiden/cameras/zed.py
def grab(self) -> bool:
    """Grab next frame (blocks until ready at camera FPS)."""
    runtime_params = sl.RuntimeParameters()
    status = self._camera.grab(runtime_params)
    return status == sl.ERROR_CODE.SUCCESS

open()

Open live camera. Depth is disabled to reduce recording overhead.

Source code in raiden/cameras/zed.py
def open(self) -> None:
    """Open live camera. Depth is disabled to reduce recording overhead."""
    init_params = sl.InitParameters()
    init_params.set_from_serial_number(self._serial)
    init_params.camera_resolution = sl.RESOLUTION.HD720
    init_params.camera_fps = self._fps
    init_params.depth_mode = sl.DEPTH_MODE.NONE
    init_params.coordinate_units = sl.UNIT.METER

    status = self._camera.open(init_params)
    if status != sl.ERROR_CODE.SUCCESS:
        raise RuntimeError(
            f"Failed to open ZED camera '{self._name}' "
            f"(serial: {self._serial}): {status}"
        )
    self._is_open = True
    self._has_depth = False

start_recording(path)

Enable SVO2 recording. Every subsequent grab() writes a frame.

Source code in raiden/cameras/zed.py
def start_recording(self, path: Path) -> None:
    """Enable SVO2 recording. Every subsequent grab() writes a frame."""
    params = sl.RecordingParameters()
    params.compression_mode = sl.SVO_COMPRESSION_MODE.H264
    params.video_filename = str(path)
    status = self._camera.enable_recording(params)
    if status != sl.ERROR_CODE.SUCCESS:
        raise RuntimeError(
            f"Failed to start SVO2 recording for '{self._name}': {status}"
        )

RealSense camera

Intel RealSense camera implementation (e.g. D405).

Recording mode opens by serial number and records to .bag via the SDK recorder. Playback mode opens a .bag file and reads color + depth frames; use RealSenseCamera.from_bag() to create a playback instance.

.. warning:: For dynamic tasks, prefer ZED cameras over RealSense. RealSense cameras have known synchronization limitations that make them less suitable for capturing fast robot motion. See Synchronization issues below for details.

Intrinsics note

RealSense cameras store per-device calibration on-board. get_intrinsics() reads the color stream's Brown-Conrady coefficients directly from the SDK, which is the same model OpenCV uses, so the values can be passed straight to cv2 functions without any conversion.

Synchronization issues

RealSense cameras (especially the D405) have several properties that make multi-camera synchronization harder than with ZED cameras:

Timestamp reliability The global_time_enabled option is supposed to stamp frames with the host wall-clock time, but support is inconsistent across D4xx firmware versions. When it is not supported, the SDK reports hardware-relative timestamps (milliseconds since device boot) that must be converted to wall-clock time. _measure_clock_offset() compensates for this by measuring the offset between the first frame timestamp and time.time_ns() at recording start, but this is only an approximation.

Frame extraction drain-loop bug The RealSense SDK pre-buffers frames during bag playback when set_real_time(False) is set. A naive "drain the queue to get the latest frame" loop (appropriate for live preview) will consume every other buffered frame, halving the apparent FPS to ~15 fps even when the bag was recorded at 30 fps. grab() therefore skips the drain loop in playback mode (_is_playback = True).

Recommendation Use ZED cameras for scene and wrist cameras in dynamic manipulation tasks. ZED timestamps (sl.TIME_REFERENCE.IMAGE) are wall-clock Unix nanoseconds, consistent across cameras and with the robot controller clock, making multi-camera alignment straightforward. RealSense cameras are best suited for static or slow-motion scenes where their close-range depth quality is the primary requirement.

RealSenseCamera

Bases: Camera

Intel RealSense D4xx camera – records to .bag

Source code in raiden/cameras/realsense.py
class RealSenseCamera(Camera):
    """Intel RealSense D4xx camera – records to .bag"""

    _COLOR_W, _COLOR_H = 640, 480
    _DEPTH_W, _DEPTH_H = 640, 480

    def __init__(self, camera_name: str, serial_number: str, fps: int = 30):
        self._name = camera_name
        self._serial = serial_number
        self._fps = fps
        self._pipeline: Optional[rs.pipeline] = None
        self._config: Optional[rs.config] = None
        self._profile: Optional[rs.pipeline_profile] = None
        self._depth_scale: float = 0.001  # metres per raw unit (D405 default)
        self._latest_frames = None
        # Actual negotiated color resolution (set after pipeline.start()).
        self._color_w: int = self._COLOR_W
        self._color_h: int = self._COLOR_H
        # Offset (ns) to convert RS SDK timestamps to wall-clock:
        # wall_ns = frame.get_timestamp() * 1_000_000 + _clock_offset_ns
        # Measured once at start_recording() by comparing the first frame
        # timestamp to time.time_ns().  None until start_recording() is called.
        self._clock_offset_ns: Optional[int] = None

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def name(self) -> str:
        return self._name

    @property
    def serial_number(self) -> str:
        return str(self._serial)

    @property
    def recording_extension(self) -> str:
        return "bag"

    # ------------------------------------------------------------------
    # Live recording
    # ------------------------------------------------------------------

    def _start_pipeline(self, cfg: "rs.config") -> None:
        """Start the pipeline and record the negotiated color resolution."""
        self._profile = self._pipeline.start(cfg)
        self._config = cfg
        color_stream = self._profile.get_stream(
            rs.stream.color
        ).as_video_stream_profile()
        self._color_w = color_stream.width()
        self._color_h = color_stream.height()

    def open(self) -> None:
        self._pipeline = rs.pipeline()
        cfg = rs.config()
        cfg.enable_device(self._serial)
        cfg.enable_stream(
            rs.stream.color, self._COLOR_W, self._COLOR_H, rs.format.bgr8, self._fps
        )
        cfg.enable_stream(
            rs.stream.depth, self._DEPTH_W, self._DEPTH_H, rs.format.z16, self._fps
        )
        self._start_pipeline(cfg)
        print(
            f"  [{self._name}] opened: "
            f"BGR8 {self._color_w}×{self._color_h} @ {self._fps}fps"
        )
        device = self._profile.get_device()
        _enable_global_time(device)
        depth_sensor = device.first_depth_sensor()
        self._depth_scale = depth_sensor.get_depth_scale()

    def close(self) -> None:
        if self._pipeline:
            self._pipeline.stop()
            self._pipeline = None
            self._profile = None

    def start_recording(self, path: Path) -> None:
        """Restart pipeline with bag recorder attached."""
        if self._pipeline:
            self._pipeline.stop()

        cfg = rs.config()
        cfg.enable_device(self._serial)
        cfg.enable_stream(
            rs.stream.color, self._COLOR_W, self._COLOR_H, rs.format.bgr8, self._fps
        )
        cfg.enable_stream(
            rs.stream.depth, self._DEPTH_W, self._DEPTH_H, rs.format.z16, self._fps
        )
        cfg.enable_record_to_file(str(path))
        self._start_pipeline(cfg)
        _enable_global_time(self._profile.get_device())
        print(
            f"  [{self._name}] recording: "
            f"BGR8 {self._color_w}×{self._color_h} @ {self._fps}fps"
        )
        self._measure_clock_offset()

    def _measure_clock_offset(self) -> None:
        """Grab the first available frame and compute RS-to-wall-clock offset.

        ``_clock_offset_ns`` is the additive correction such that:
            wall_ns = int(frame.get_timestamp() * 1_000_000) + _clock_offset_ns

        This works whether or not global_time_enabled is supported by the
        device: if it is supported the stored timestamps are already wall-clock
        and the offset will be ~0; if not, the offset captures the difference
        between the RealSense hardware clock and the system clock at the
        moment recording begins, which is stable enough over a 10-60 s episode.
        """
        try:
            frames = self._pipeline.wait_for_frames(timeout_ms=2000)
            wall_ns = time.time_ns()
            color_frame = frames.get_color_frame()
            if color_frame:
                rs_ns = int(color_frame.get_timestamp() * 1_000_000)
                self._clock_offset_ns = wall_ns - rs_ns
        except Exception:
            self._clock_offset_ns = None

    def stop_recording(self) -> None:
        """Restart pipeline without recorder to finalize the bag file."""
        if self._pipeline:
            self._pipeline.stop()
        # Rebuild config without enable_record_to_file — there is no
        # disable_record_and_playback() in the pyrealsense2 Python bindings.
        # Use the same color resolution that was negotiated in start_recording().
        cfg = rs.config()
        cfg.enable_device(self._serial)
        cfg.enable_stream(
            rs.stream.color, self._color_w, self._color_h, rs.format.bgr8, self._fps
        )
        cfg.enable_stream(
            rs.stream.depth, self._DEPTH_W, self._DEPTH_H, rs.format.z16, self._fps
        )
        self._config = cfg
        self._profile = self._pipeline.start(cfg)
        _enable_global_time(self._profile.get_device())

    def grab(self) -> bool:
        try:
            frames = self._pipeline.wait_for_frames(timeout_ms=500)
            if not getattr(self, "_is_playback", False):
                # Drain any additional buffered frames so we always use the most
                # recent one. Bag-file writing overhead can cause the queue to
                # accumulate, making wait_for_frames return increasingly stale frames.
                # Skip this in playback mode: set_real_time(False) pre-buffers the
                # next frame, so draining would skip every other frame (30fps → 15fps).
                while True:
                    ok, newer = self._pipeline.try_wait_for_frames(timeout_ms=0)
                    if not ok:
                        break
                    frames = newer
            self._latest_frames = frames
            return True
        except RuntimeError:
            return False

    def get_current_timestamp_ns(self) -> int:
        """Return the capture timestamp of the most recently grabbed frame.

        With global_time_enabled the RealSense SDK stamps frames with system
        wall-clock time, so this is on the same clock as time.time_ns() and
        can be used directly to align robot data with camera frames.
        """
        if self._latest_frames is not None:
            frame = self._latest_frames.get_color_frame()
            if frame:
                return int(frame.get_timestamp() * 1_000_000)
        import time

        return time.time_ns()

    def get_frame(self) -> CameraFrame:
        if self._latest_frames is None:
            raise RuntimeError("No frames available. Call grab() first.")

        frames = self._latest_frames
        align = getattr(self, "_align", None)
        if align is not None:
            frames = align.process(frames)

        color_frame = frames.get_color_frame()
        depth_frame = frames.get_depth_frame()

        color = np.asanyarray(color_frame.get_data())  # BGR uint8
        depth_raw = np.asanyarray(depth_frame.get_data())  # uint16 raw units
        depth = (depth_raw * self._depth_scale).astype(np.float32)

        # RealSense timestamp is in milliseconds
        timestamp_ns = int(color_frame.get_timestamp() * 1_000_000)

        return CameraFrame(color=color, depth=depth, timestamp_ns=timestamp_ns)

    def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
        """Return on-device calibrated intrinsics for the color stream.

        RealSense stores per-unit calibration on-board.  The distortion model
        is Brown-Conrady (identical to OpenCV's), so coefficients are directly
        usable with cv2 functions.
        """
        if self._profile is None:
            raise RuntimeError("Camera is not open. Call open() first.")

        color_stream = self._profile.get_stream(
            rs.stream.color
        ).as_video_stream_profile()
        intr = color_stream.get_intrinsics()

        camera_matrix = np.array(
            [[intr.fx, 0.0, intr.ppx], [0.0, intr.fy, intr.ppy], [0.0, 0.0, 1.0]],
            dtype=np.float64,
        )
        # Brown-Conrady: [k1, k2, p1, p2, k3]
        dist_coeffs = np.array(intr.coeffs[:5], dtype=np.float64)
        image_size = (intr.width, intr.height)
        return camera_matrix, dist_coeffs, image_size

    # ------------------------------------------------------------------
    # Playback from .bag file
    # ------------------------------------------------------------------

    @classmethod
    def from_bag(cls, camera_name: str, bag_path: Path) -> "RealSenseCamera":
        """Open a .bag file for frame-by-frame extraction."""
        cam = cls.__new__(cls)
        cam._name = camera_name
        cam._serial = ""
        cam._fps = 30
        cam._depth_scale = 0.001
        cam._latest_frames = None

        cam._pipeline = rs.pipeline()
        cam._config = rs.config()
        rs.config.enable_device_from_file(
            cam._config, str(bag_path), repeat_playback=False
        )
        cam._profile = cam._pipeline.start(cam._config)

        # Non-realtime playback so we don't drop frames
        playback = cam._profile.get_device().as_playback()
        playback.set_real_time(False)

        try:
            ds = cam._profile.get_device().first_depth_sensor()
            cam._depth_scale = ds.get_depth_scale()
        except Exception:
            pass

        # Align depth to color so they share the same pixel grid
        cam._align = rs.align(rs.stream.color)
        cam._is_playback = True

        return cam

    def get_camera_info(self) -> dict:
        """Return camera intrinsics and metadata as a plain dict."""
        if not self._profile:
            return {}
        color_stream = self._profile.get_stream(
            rs.stream.color
        ).as_video_stream_profile()
        intr = color_stream.get_intrinsics()
        return {
            "serial_number": self._serial,
            "model": "RealSense",
            "fps": self._fps,
            "width": intr.width,
            "height": intr.height,
            "fx": intr.fx,
            "fy": intr.fy,
            "cx": intr.ppx,
            "cy": intr.ppy,
            "k1": intr.coeffs[0],
            "k2": intr.coeffs[1],
            "p1": intr.coeffs[2],
            "p2": intr.coeffs[3],
            "k3": intr.coeffs[4],
        }

from_bag(camera_name, bag_path) classmethod

Open a .bag file for frame-by-frame extraction.

Source code in raiden/cameras/realsense.py
@classmethod
def from_bag(cls, camera_name: str, bag_path: Path) -> "RealSenseCamera":
    """Open a .bag file for frame-by-frame extraction."""
    cam = cls.__new__(cls)
    cam._name = camera_name
    cam._serial = ""
    cam._fps = 30
    cam._depth_scale = 0.001
    cam._latest_frames = None

    cam._pipeline = rs.pipeline()
    cam._config = rs.config()
    rs.config.enable_device_from_file(
        cam._config, str(bag_path), repeat_playback=False
    )
    cam._profile = cam._pipeline.start(cam._config)

    # Non-realtime playback so we don't drop frames
    playback = cam._profile.get_device().as_playback()
    playback.set_real_time(False)

    try:
        ds = cam._profile.get_device().first_depth_sensor()
        cam._depth_scale = ds.get_depth_scale()
    except Exception:
        pass

    # Align depth to color so they share the same pixel grid
    cam._align = rs.align(rs.stream.color)
    cam._is_playback = True

    return cam

get_camera_info()

Return camera intrinsics and metadata as a plain dict.

Source code in raiden/cameras/realsense.py
def get_camera_info(self) -> dict:
    """Return camera intrinsics and metadata as a plain dict."""
    if not self._profile:
        return {}
    color_stream = self._profile.get_stream(
        rs.stream.color
    ).as_video_stream_profile()
    intr = color_stream.get_intrinsics()
    return {
        "serial_number": self._serial,
        "model": "RealSense",
        "fps": self._fps,
        "width": intr.width,
        "height": intr.height,
        "fx": intr.fx,
        "fy": intr.fy,
        "cx": intr.ppx,
        "cy": intr.ppy,
        "k1": intr.coeffs[0],
        "k2": intr.coeffs[1],
        "p1": intr.coeffs[2],
        "p2": intr.coeffs[3],
        "k3": intr.coeffs[4],
    }

get_current_timestamp_ns()

Return the capture timestamp of the most recently grabbed frame.

With global_time_enabled the RealSense SDK stamps frames with system wall-clock time, so this is on the same clock as time.time_ns() and can be used directly to align robot data with camera frames.

Source code in raiden/cameras/realsense.py
def get_current_timestamp_ns(self) -> int:
    """Return the capture timestamp of the most recently grabbed frame.

    With global_time_enabled the RealSense SDK stamps frames with system
    wall-clock time, so this is on the same clock as time.time_ns() and
    can be used directly to align robot data with camera frames.
    """
    if self._latest_frames is not None:
        frame = self._latest_frames.get_color_frame()
        if frame:
            return int(frame.get_timestamp() * 1_000_000)
    import time

    return time.time_ns()

get_intrinsics()

Return on-device calibrated intrinsics for the color stream.

RealSense stores per-unit calibration on-board. The distortion model is Brown-Conrady (identical to OpenCV's), so coefficients are directly usable with cv2 functions.

Source code in raiden/cameras/realsense.py
def get_intrinsics(self) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
    """Return on-device calibrated intrinsics for the color stream.

    RealSense stores per-unit calibration on-board.  The distortion model
    is Brown-Conrady (identical to OpenCV's), so coefficients are directly
    usable with cv2 functions.
    """
    if self._profile is None:
        raise RuntimeError("Camera is not open. Call open() first.")

    color_stream = self._profile.get_stream(
        rs.stream.color
    ).as_video_stream_profile()
    intr = color_stream.get_intrinsics()

    camera_matrix = np.array(
        [[intr.fx, 0.0, intr.ppx], [0.0, intr.fy, intr.ppy], [0.0, 0.0, 1.0]],
        dtype=np.float64,
    )
    # Brown-Conrady: [k1, k2, p1, p2, k3]
    dist_coeffs = np.array(intr.coeffs[:5], dtype=np.float64)
    image_size = (intr.width, intr.height)
    return camera_matrix, dist_coeffs, image_size

start_recording(path)

Restart pipeline with bag recorder attached.

Source code in raiden/cameras/realsense.py
def start_recording(self, path: Path) -> None:
    """Restart pipeline with bag recorder attached."""
    if self._pipeline:
        self._pipeline.stop()

    cfg = rs.config()
    cfg.enable_device(self._serial)
    cfg.enable_stream(
        rs.stream.color, self._COLOR_W, self._COLOR_H, rs.format.bgr8, self._fps
    )
    cfg.enable_stream(
        rs.stream.depth, self._DEPTH_W, self._DEPTH_H, rs.format.z16, self._fps
    )
    cfg.enable_record_to_file(str(path))
    self._start_pipeline(cfg)
    _enable_global_time(self._profile.get_device())
    print(
        f"  [{self._name}] recording: "
        f"BGR8 {self._color_w}×{self._color_h} @ {self._fps}fps"
    )
    self._measure_clock_offset()

stop_recording()

Restart pipeline without recorder to finalize the bag file.

Source code in raiden/cameras/realsense.py
def stop_recording(self) -> None:
    """Restart pipeline without recorder to finalize the bag file."""
    if self._pipeline:
        self._pipeline.stop()
    # Rebuild config without enable_record_to_file — there is no
    # disable_record_and_playback() in the pyrealsense2 Python bindings.
    # Use the same color resolution that was negotiated in start_recording().
    cfg = rs.config()
    cfg.enable_device(self._serial)
    cfg.enable_stream(
        rs.stream.color, self._color_w, self._color_h, rs.format.bgr8, self._fps
    )
    cfg.enable_stream(
        rs.stream.depth, self._DEPTH_W, self._DEPTH_H, rs.format.z16, self._fps
    )
    self._config = cfg
    self._profile = self._pipeline.start(cfg)
    _enable_global_time(self._profile.get_device())