Skip to content

Chiral

Chiral

Alpha

This project is in early alpha. APIs may change without notice.

Compact interface for robot policy evaluation.

Sensor observations — RGB images, depth maps, camera intrinsics, camera extrinsics, and proprioception — are streamed from a robot server to a policy client in a separate process. Observations and actions flow on independent channels so chunked policy predictions never stall waiting for camera data. All communication is handled by the library; only application logic needs to be implemented.


Architecture

  ┌─────────────────┐   obs stream (30 Hz)    ┌──────────────────┐
  │  PolicyServer   │ ──────────────────────► │  PolicyClient    │
  │  (robot side)   │                         │  (policy side)   │
  │                 │ ◄────────────────────── │                  │
  └─────────────────┘  action dispatch (10Hz) └──────────────────┘

The server runs on the robot side. The client connects, calls reset() once to start an episode, then runs three concurrent threads: one that continuously polls for the latest observation, one that runs policy inference and enqueues action chunks, and one that dispatches actions to the robot at a fixed Hz.


Quick Start

Server (robot side)

import threading, time
import numpy as np
import chiral

H, W = 480, 640

class MyServer(chiral.PolicyServer):
    def camera_configs(self):
        return [chiral.CameraConfig(
            name="wrist_cam", height=H, width=W, channels=3,
            has_depth=True,
            intrinsics=np.array([[600,0,320],[0,600,240],[0,0,1]], dtype=np.float64),
            extrinsics=np.eye(4, dtype=np.float64),
        )]

    def proprio_configs(self):
        return [chiral.ProprioConfig(name="joint_pos", size=7)]

    def __init__(self):
        super().__init__(host="0.0.0.0", port=8765)
        for name in self.images:
            threading.Thread(target=self._camera_loop, args=(name,), daemon=True).start()

    def _camera_loop(self, name: str):
        while True:
            frame = np.random.randint(0, 255, (H, W, 3), dtype=np.uint8)
            depth = np.random.rand(H, W).astype(np.float32)
            self.update_image(name, frame)
            self.update_depth(name, depth)
            time.sleep(1 / 30)

    async def reset(self):
        return self._make_obs(), {}

    async def apply_action(self, action):
        pass  # send action to robot hardware

MyServer().run()

Client (policy side)

import threading
import numpy as np
import chiral

def policy_loop(env, stop):
    while not stop.is_set():
        obs = env.latest_obs
        if obs is None:
            continue
        print(obs["wrist_cam"].image.shape, obs["wrist_cam"].intrinsics)
        actions = np.zeros([8, 7], dtype=np.float32)  # chunked predictions
        for a in actions:
            env.put_action(a)

with chiral.PolicyClient("ws://localhost:8765") as env:
    obs, info = env.reset()
    env.start_obs_stream(hz=30)
    env.start_action_dispatch(hz=10)

    stop = threading.Event()
    t = threading.Thread(target=policy_loop, args=(env, stop))
    t.start()
    # ... run for desired duration, then:
    stop.set(); t.join()

Server (robot side)

#include <chiral/server.hpp>
#include <Eigen/Dense>
#include <atomic>
#include <thread>

class MyServer : public chiral::PolicyServer {
    std::atomic<bool> running_{true};
public:
    MyServer() : PolicyServer(
        []{
            chiral::CameraConfig c;
            c.name = "wrist_cam"; c.height = 480; c.width = 640;
            c.channels = 3; c.has_depth = true;
            c.intrinsics << 600, 0, 320, 0, 600, 240, 0, 0, 1;
            c.extrinsics = Eigen::Matrix4d::Identity();
            return std::vector<chiral::CameraConfig>{c};
        }(),
        {{"joint_pos", 7}},
        "0.0.0.0", 8765)
    {
        for (std::size_t i = 0; i < configs_.size(); ++i)
            std::thread(&MyServer::camera_loop, this, i).detach();
    }
    ~MyServer() { running_ = false; }

    std::pair<chiral::Observation, chiral::InfoMap> reset() override {
        return {make_obs(0.0), {}};
    }

    // C++ still uses the legacy coupled step() API
    chiral::StepResult step(const chiral::Action&) override {
        chiral::StepResult r;
        r.obs = make_obs(); r.reward = 0.f;
        r.terminated = false; r.truncated = false;
        return r;
    }
private:
    void camera_loop(std::size_t idx) {
        while (running_) {
            Eigen::Matrix4d T = Eigen::Matrix4d::Identity();
            update_extrinsics(idx, T);
            std::this_thread::sleep_for(std::chrono::milliseconds(33));
        }
    }
};

int main() { MyServer().run(); }

Client (policy side)

#include <chiral/client.hpp>
#include <cstdio>

int main() {
    chiral::PolicyClient env("ws://localhost:8765");
    env.connect();

    auto [obs, info] = env.reset();

    for (int i = 0; i < 100; ++i) {
        chiral::Action action;
        action.N = 1; action.D = 7;
        action.data.assign(7, 0.f);

        auto res = env.step(action);
        obs = std::move(res.obs);
        std::printf("step %d  reward=%.2f\n", i, res.reward);
        if (res.terminated || res.truncated) break;
    }

    env.close();
}

Next Steps