Skip to content

Converter

Convert raw camera recordings to UnifiedDataset format.

Input: a recording directory produced by rd record. Output: UnifiedDataset layout inside the same directory.

Usage::

rd convert ./data/test_2/test_task_2_20260223_162246

Output layout::

<recording_dir>/
    split_all.json
    metadata_shared.json
    calibration_results.json      # copied from the first recording dir
    0000/
        metadata.json
        rgb/
            scene_camera/
                0000000000.jpg      # JPEG quality >= 90
                0000000001.jpg
                ...
            left_wrist_camera/
                ...
        depth/
            scene_camera/
                0000000000.npz      # np.uint16, millimetres
                ...
        lowdim/
            scene_camera/
                0000000000.npz      # intrinsics, extrinsics, action, language
                0000000001.npz
                ...
            left_wrist_camera/
                ...
            right_wrist_camera/
                ...
    cameras/                      # raw SVO2/bag files kept
    metadata.json                 # original recording metadata (updated: converted=true)
    robot_data.npz

convert_recording(recording_dir, episode_dir=None, stereo_method='zed', ffs_scale=1.0, ffs_iters=8, tri_stereo_variant='c64')

Convert a recording directory to UnifiedDataset format.

When episode_dir is provided the sequence data is written there instead of recording_dir/0000, and dataset-level files (split_all.json, metadata_shared.json, calibration_results.json) are skipped so the caller (e.g. :func:convert_task) can aggregate them.

Returns per-camera frame counts, or an empty dict if already converted.

Source code in raiden/converter.py
def convert_recording(
    recording_dir: str,
    episode_dir: Optional[str] = None,
    stereo_method: str = "zed",
    ffs_scale: float = 1.0,
    ffs_iters: int = 8,
    tri_stereo_variant: str = "c64",
) -> Dict[str, int]:
    """Convert a recording directory to UnifiedDataset format.

    When *episode_dir* is provided the sequence data is written there instead
    of ``recording_dir/0000``, and dataset-level files (split_all.json,
    metadata_shared.json, calibration_results.json) are skipped so the caller
    (e.g. :func:`convert_task`) can aggregate them.

    Returns per-camera frame counts, or an empty dict if already converted.
    """
    rec_path = Path(recording_dir)
    if not rec_path.exists():
        print(f"Error: directory not found: {rec_path}")
        sys.exit(1)

    cameras_path = rec_path / "cameras"
    if not cameras_path.exists():
        print(f"Error: no cameras/ sub-directory in {rec_path}")
        sys.exit(1)

    svo2_files = sorted(cameras_path.glob("*.svo2"))
    bag_files = sorted(cameras_path.glob("*.bag"))

    if not svo2_files and not bag_files:
        print("No .svo2 or .bag files found in cameras/")
        sys.exit(1)

    seq_dir = Path(episode_dir) if episode_dir else rec_path / _SEQUENCE_NAME
    if (seq_dir / "rgb").exists():
        print(f"Already converted: {seq_dir}")
        return {}

    seq_dir.mkdir(parents=True, exist_ok=True)

    print(f"\nConverting to UnifiedDataset: {rec_path.name}")
    print(f"  Found {len(svo2_files)} SVO2 file(s), {len(bag_files)} bag file(s)\n")

    # ── load supporting data ──────────────────────────────────────────────
    rec_meta: dict = {}
    meta_path = rec_path / "metadata.json"
    if meta_path.exists():
        with open(meta_path) as f:
            rec_meta = json.load(f)

    robot_data: Optional[Dict[str, np.ndarray]] = None
    robot_path = rec_path / "robot_data.npz"
    if robot_path.exists():
        npz = np.load(robot_path, allow_pickle=False)
        robot_data = {k: npz[k] for k in npz.files}

    calib: Optional[dict] = None
    calib_path = rec_path / "calibration_results.json"
    if calib_path.exists():
        with open(calib_path) as f:
            calib = json.load(f)

    # "right_base_to_left_base" maps left_arm_base → right_arm_base (despite the name).
    # Invert to get the transform that brings right_arm_base points into left_arm_base.
    T_left_base_from_right_base: Optional[np.ndarray] = None
    if calib and "bimanual_transform" in calib:
        mat = calib["bimanual_transform"].get("right_base_to_left_base")
        if mat is not None:
            T_left_base_from_right_base = np.linalg.inv(np.array(mat, dtype=np.float32))
    if T_left_base_from_right_base is None:
        print(
            "  Warning: bimanual_transform not found in calibration; right wrist extrinsics will be in right_arm_base frame"
        )

    # ── extract frames ────────────────────────────────────────────────────
    frame_counts: Dict[str, int] = {}
    camera_infos: Dict[str, Optional[dict]] = {}
    cam_timestamps: Dict[str, Optional[np.ndarray]] = {}

    # SVO2 cameras are extracted together with cross-camera temporal alignment.
    # Skip only when every SVO2 camera already has its rgb dir AND timestamps.npy
    # (the latter is written by _extract_svo2_synchronized).
    svo2_names = [p.stem for p in svo2_files]
    svo2_all_done = svo2_names and all(
        (seq_dir / "rgb" / name).exists()
        and (seq_dir / "rgb" / name / "timestamps.npy").exists()
        for name in svo2_names
    )

    if svo2_all_done:
        for name in svo2_names:
            rgb_dir = seq_dir / "rgb" / name
            n = len(list(rgb_dir.glob("*.jpg")))
            print(f"  Skipping {name} (already extracted, {n} frames)")
            frame_counts[name] = n
            camera_infos[name] = None
            cam_timestamps[name] = np.load(str(rgb_dir / "timestamps.npy"))
    elif svo2_files:
        # Pre-scan to determine the min frame count cap.
        pre_counts: Dict[str, int] = {}
        for svo_path in svo2_files:
            name = svo_path.stem
            rgb_dir_check = seq_dir / "rgb" / name
            if rgb_dir_check.exists():
                pre_counts[name] = len(list(rgb_dir_check.glob("*.jpg")))
            else:
                print(f"  Pre-scanning {svo_path.name} ...")
                pre_counts[name] = _count_svo2_frames(svo_path)
                print(f"    {pre_counts[name]} frames")
        max_frames_svo2 = min(pre_counts.values()) if pre_counts else None
        if max_frames_svo2 is not None:
            print(
                f"  Capping SVO2 extraction at {max_frames_svo2} frames (min across cameras)\n"
            )

        print(f"  Extracting {len(svo2_files)} SVO2 file(s) in sync ...")
        if stereo_method != "zed":
            extra = ""
            if stereo_method == "ffs" and ffs_scale != 1.0:
                extra = f" (scale={ffs_scale})"
            elif stereo_method == "tri_stereo":
                extra = f" (variant={tri_stereo_variant})"
            print(f"  Stereo method: {stereo_method}{extra}")
        sync_results = _extract_svo2_synchronized(
            svo_paths=svo2_files,
            names=svo2_names,
            rgb_dirs=[seq_dir / "rgb" / n for n in svo2_names],
            depth_dirs=[seq_dir / "depth" / n for n in svo2_names],
            flips=[n in _FLIP_CAMERAS for n in svo2_names],
            max_frames=max_frames_svo2,
            stereo_method=stereo_method,
            ffs_scale=ffs_scale,
            ffs_iters=ffs_iters,
            tri_stereo_variant=tri_stereo_variant,
        )
        for name, (ts_arr, info) in sync_results.items():
            frame_counts[name] = len(ts_arr)
            camera_infos[name] = info
            cam_timestamps[name] = ts_arr
            print(f"  ✓ {name}: {len(ts_arr)} frames")

    # Bag files are extracted independently then aligned to ZED by timestamp.
    bag_max = min(frame_counts.values()) if frame_counts else None
    rs_offsets = rec_meta.get("realsense_clock_offsets", {})
    for bag_path in bag_files:
        name = bag_path.stem
        rgb_dir = seq_dir / "rgb" / name
        depth_dir = seq_dir / "depth" / name

        if rgb_dir.exists():
            n = len(list(rgb_dir.glob("*.jpg")))
            print(f"  Skipping {name} (already extracted, {n} frames)")
            frame_counts[name] = n
            camera_infos[name] = None
            ts_path = rgb_dir / "timestamps.npy"
            if ts_path.exists():
                ts_arr = np.load(str(ts_path))
                clock_offset = rs_offsets.get(name)
                cam_timestamps[name] = (
                    ts_arr + int(clock_offset) if clock_offset is not None else ts_arr
                )
            else:
                cam_timestamps[name] = None
            continue

        flip = name in _FLIP_CAMERAS
        print(f"  Extracting {bag_path.name}" + (" (flipped)" if flip else ""))
        ts_arr, info = _extract_bag(
            bag_path, rgb_dir, depth_dir, flip=flip, max_frames=bag_max
        )
        frame_counts[name] = len(ts_arr)
        camera_infos[name] = info

        # With global_time_enabled, RealSense timestamps are wall-clock (same as
        # ZED).  Old recordings may carry a clock offset in metadata; apply it
        # for backward compatibility.
        clock_offset = rs_offsets.get(name)
        if len(ts_arr) > 0:
            cam_timestamps[name] = (
                ts_arr + int(clock_offset) if clock_offset is not None else ts_arr
            )
        else:
            cam_timestamps[name] = None
        print(f"  ✓ {name}: {len(ts_arr)} frames\n")

    # Align all cameras to their overlapping recording window, correcting for
    # sequential startup offsets between ZED and RealSense cameras.
    cam_timestamps, frame_counts = _align_cameras_by_timestamp(
        seq_dir,
        cam_timestamps,
        frame_counts,
        camera_start_times_ns=rec_meta.get("camera_start_times_ns"),
    )

    # Trim all cameras to the same (minimum) frame count.
    if frame_counts:
        n_min = min(frame_counts.values())
        frame_counts = {k: n_min for k in frame_counts}
        cam_timestamps = {
            k: (ts[:n_min] if ts is not None else None)
            for k, ts in cam_timestamps.items()
        }

    cameras = list(frame_counts.keys())

    # ── lowdim ────────────────────────────────────────────────────────────
    print("\nBuilding lowdim...")
    _build_lowdim(
        seq_dir=seq_dir,
        cameras=cameras,
        n_frames=n_min,
        camera_infos=camera_infos,
        calib=calib,
        robot_data=robot_data,
        rec_meta=rec_meta,
        flip_cameras=_FLIP_CAMERAS,
        right_base_to_left_base=T_left_base_from_right_base,
        cam_timestamps=cam_timestamps,
    )
    print(f"  ✓ lowdim/ ({n_min} frames)")

    # ── sequence metadata ─────────────────────────────────────────────────
    _build_sequence_metadata(seq_dir, cameras, frame_counts, rec_meta, camera_infos)
    print("  ✓ metadata.json")

    if episode_dir is None:
        # ── dataset-level files ───────────────────────────────────────────
        _build_split(rec_path, frame_counts)
        print("  ✓ split_all.json")

        shutil.copy(seq_dir / "metadata.json", rec_path / "metadata_shared.json")
        print("  ✓ metadata_shared.json")

        if calib_path.exists():
            shutil.copy(calib_path, rec_path / "calibration_results.json")
            print(f"  ✓ calibration_results.json (copied from {calib_path})")
        else:
            print(f"  Warning: calibration_results.json not found at {calib_path}")

    # ── update original metadata ──────────────────────────────────────────
    if meta_path.exists():
        with open(meta_path) as f:
            meta = json.load(f)
        meta["converted"] = True
        with open(meta_path, "w") as f:
            json.dump(meta, f, indent=2)

    print(f"\n✓ UnifiedDataset ready: {seq_dir if episode_dir else rec_path}")
    return frame_counts

convert_task(task_dir, output_dir=None, stereo_method='zed', ffs_scale=1.0, ffs_iters=8, reconvert=False, processed_base=None, tri_stereo_variant='c64')

Convert all recordings in a task directory into a single UnifiedDataset.

Each recording becomes a numbered episode (0000, 0001, …) under out_base::

<output_dir>/<task_name>/
    0000/
    0001/
    ...
    split_all.json
    metadata_shared.json
    calibration_results.json

output_dir defaults to <task_parent>/processed_data.

Source code in raiden/converter.py
def convert_task(
    task_dir: str,
    output_dir: Optional[str] = None,
    stereo_method: str = "zed",
    ffs_scale: float = 1.0,
    ffs_iters: int = 8,
    reconvert: bool = False,
    processed_base: Optional[str] = None,
    tri_stereo_variant: str = "c64",
) -> None:
    """Convert all recordings in a task directory into a single UnifiedDataset.

    Each recording becomes a numbered episode (0000, 0001, …) under *out_base*::

        <output_dir>/<task_name>/
            0000/
            0001/
            ...
            split_all.json
            metadata_shared.json
            calibration_results.json

    *output_dir* defaults to ``<task_parent>/processed_data``.
    """
    task_path = Path(task_dir)
    if not task_path.exists():
        print(f"Error: directory not found: {task_path}")
        sys.exit(1)

    recording_dirs = sorted(
        d for d in task_path.iterdir() if d.is_dir() and (d / "cameras").exists()
    )

    if not recording_dirs:
        print(f"Error: no recording directories found in {task_path}")
        sys.exit(1)

    if output_dir:
        out_base = Path(output_dir) / task_path.name
    elif processed_base:
        out_base = Path(processed_base) / task_path.name
    else:
        out_base = Path("data") / "processed" / task_path.name
    out_base.mkdir(parents=True, exist_ok=True)

    print(f"Found {len(recording_dirs)} recording(s) in {task_path.name}")
    print(f"Output → {out_base}\n")

    episode_frame_counts: Dict[str, int] = {}

    # Lazy import to avoid hard dependency when DB is not set up
    try:
        from raiden.db.database import get_db as _get_db

        _db = _get_db()
    except Exception:
        _db = None

    # Filter recordings: skip failures/pending, and (unless reconvert) already
    # converted ones.  Recordings with no DB entry are treated as unknown and
    # included so directories recorded before the DB was set up are not dropped.
    success_dirs = []
    skipped = 0
    for rec_dir in recording_dirs:
        status = "unknown"
        already_converted = False
        if _db is not None:
            try:
                demo = _db.get_demonstration_by_raw_path(str(rec_dir))
                if demo is not None:
                    status = demo.get("status", "pending")
                    already_converted = bool(demo.get("converted", False))
            except Exception:
                pass
        if status not in ("success", "unknown"):
            print(f"  Skipping {rec_dir.name} (status={status})")
            skipped += 1
        elif already_converted and not reconvert:
            print(
                f"  Skipping {rec_dir.name} (already converted, use --reconvert to force)"
            )
            skipped += 1
        else:
            success_dirs.append(rec_dir)

    if skipped:
        print(f"\nSkipped {skipped} non-success recording(s).")
    if not success_dirs:
        print("No successful recordings to convert.")
        return

    print(f"Converting {len(success_dirs)} successful recording(s)\n")

    for i, rec_dir in enumerate(success_dirs):
        episode_name = f"{i:04d}"
        ep_dir = out_base / episode_name
        print(f"[{i + 1}/{len(success_dirs)}] {rec_dir.name}{episode_name}/")
        counts = convert_recording(
            str(rec_dir),
            episode_dir=str(ep_dir),
            stereo_method=stereo_method,
            ffs_scale=ffs_scale,
            ffs_iters=ffs_iters,
            tri_stereo_variant=tri_stereo_variant,
        )

        if counts:
            episode_frame_counts[episode_name] = max(counts.values())
        else:
            # Already converted — read frame count from existing metadata.
            meta_file = ep_dir / "metadata.json"
            if meta_file.exists():
                with open(meta_file) as f:
                    ep_meta = json.load(f)
                episode_frame_counts[episode_name] = ep_meta.get("num_frames", 0)
            else:
                episode_frame_counts[episode_name] = 0

        # Update Demonstration status in DB
        if _db is not None:
            try:
                demo = _db.get_demonstration_by_raw_path(str(rec_dir))
                if demo is not None:
                    _db.update_demonstration(
                        demo["id"],
                        converted=True,
                        converted_data_path=str(ep_dir),
                    )
            except Exception:
                pass

    # ── combined split_all.json ────────────────────────────────────────────
    total_frames = sum(episode_frame_counts.values())
    n_eps = len(episode_frame_counts)
    split = {
        "filters": {},
        "size": {"n_seqs": n_eps, "n_samples": n_eps, "n_frames": total_frames},
        "files": episode_frame_counts,
    }
    with open(out_base / "split_all.json", "w") as f:
        json.dump(split, f, indent=2)
    print(f"\n✓ split_all.json ({n_eps} episodes, {total_frames} total frames)")

    # ── shared metadata & calibration ─────────────────────────────────────
    first_ep = next(iter(episode_frame_counts))
    first_meta = out_base / first_ep / "metadata.json"
    if first_meta.exists():
        shutil.copy(first_meta, out_base / "metadata_shared.json")
        print("✓ metadata_shared.json")

    first_calib = recording_dirs[0] / "calibration_results.json"
    if first_calib.exists():
        shutil.copy(first_calib, out_base / "calibration_results.json")
        print("✓ calibration_results.json")

    print(f"\n✓ Task dataset ready: {out_base}")

select_tasks(data_dir='data')

Use fzf to select one or more task directories (Tab to multi-select).

Source code in raiden/converter.py
def select_tasks(data_dir: str = "data") -> List[str]:
    """Use fzf to select one or more task directories (Tab to multi-select)."""
    base = Path(data_dir) / "raw"
    task_dirs = sorted(
        d
        for d in base.iterdir()
        if d.is_dir()
        and any((sub / "cameras").exists() for sub in d.iterdir() if sub.is_dir())
    )

    if not task_dirs:
        print(f"No tasks found in {base}")
        sys.exit(1)

    labels = {
        f"{d.name}  ({sum(1 for s in d.iterdir() if s.is_dir() and (s / 'cameras').exists())} recording(s))": d
        for d in task_dirs
    }

    from raiden.utils import fzf_select

    selected = fzf_select(list(labels), prompt="Convert task(s)> ", multi=True)
    return [str(labels[s]) for s in selected]