Server Guide
The server runs on the robot side; it owns the sensor hardware and the environment state. The client connects, calls reset() to start an episode, then independently polls for observations and dispatches actions at fixed rates.
Threading Model
The server runs one network thread (the asyncio event loop) that handles WebSocket messages. When the client sends an obs_request, the network thread calls get_obs() to snapshot the current sensor state. When the client sends an apply_action, the network thread calls apply_action() with no response.
Meanwhile, one or more sensor threads run independently: a camera capture thread reads frames from the hardware at 30 Hz; a proprioception thread reads joint state at 500 Hz. These threads write into the pre-allocated buffers using the update_* helpers.
The key invariant is that a sensor thread and the network thread must never corrupt a buffer by writing and reading it simultaneously. Chiral solves this with one mutex per camera that protects all four buffers for that camera (image pixels, depth map, intrinsics, extrinsics), and one mutex per proprio stream.
┌─────────────────────────────────┐
camera thread │ update_image / update_depth │
(30 Hz) │ update_intrinsics / update_extr │──► images_[i], depths_[i]
│ (acquire cam_mutex_[i]) │ intrinsics_[i], extrinsics_[i]
└────────────────────┬────────────┘
│ same mutex
┌────────────────────▼────────────┐
network thread│ get_obs() / make_obs() │
(on obs_req) │ (acquire cam_mutex_[i]) │──► Observation snapshot
└─────────────────────────────────┘
Because each camera has its own independent mutex, multiple camera threads can write concurrently without blocking each other. The snapshot in make_obs() acquires each camera's lock in turn, copies the buffers, and releases the lock before moving to the next camera.
Do not hold a camera lock across slow operations
The update_* helpers acquire the lock, copy the data, and release immediately. Never hold the lock across a hardware read (e.g. blocking on a USB transfer) or you will stall the network thread.
Step 1 — Declare Cameras
The first thing to implement is camera_configs() (Python) or pass a vector<CameraConfig> to the constructor (C++). This is called once at construction time. The base class uses the returned configs to pre-allocate all buffers and mutexes before any step is taken.
import numpy as np
import chiral
class MyServer(chiral.PolicyServer):
def camera_configs(self) -> list[chiral.CameraConfig]:
# Intrinsics: standard pinhole camera matrix K
# [fx 0 cx]
# [ 0 fy cy]
# [ 0 0 1]
K = np.array([
[600.0, 0.0, 320.0],
[ 0.0, 600.0, 240.0],
[ 0.0, 0.0, 1.0],
], dtype=np.float64)
return [
chiral.CameraConfig(
name="wrist_cam", # unique string identifier
height=480, # image height in pixels
width=640, # image width in pixels
channels=3, # 3 for RGB, 1 for grayscale
has_depth=True, # set True to allocate a depth buffer too
intrinsics=K, # (3,3) float64 — initial K; updated each frame if zoom changes
extrinsics=np.eye(4, dtype=np.float64), # (4,4) float64 — initial camera-to-world T
image_dtype=np.uint8, # pixel dtype; uint8 is the most common choice
depth_dtype=np.float32, # depth dtype; float32 in metres
),
chiral.CameraConfig(
name="head_cam",
height=720, width=1280, channels=3,
has_depth=False, # no depth for this camera — no depth buffer allocated
intrinsics=np.eye(3, dtype=np.float64),
extrinsics=np.eye(4, dtype=np.float64),
),
]
#include <chiral/server.hpp>
#include <Eigen/Dense>
static std::vector<chiral::CameraConfig> make_cam_configs() {
// Intrinsics: standard pinhole camera matrix K
Eigen::Matrix3d K;
K << 600.0, 0.0, 320.0,
0.0, 600.0, 240.0,
0.0, 0.0, 1.0;
chiral::CameraConfig wrist;
wrist.name = "wrist_cam"; // unique string identifier
wrist.height = 480; // image height in pixels
wrist.width = 640; // image width in pixels
wrist.channels = 3; // 3 for RGB, 1 for grayscale
wrist.has_depth = true; // allocate a depth buffer too
wrist.intrinsics = K; // Eigen::Matrix3d — initial K
wrist.extrinsics = Eigen::Matrix4d::Identity(); // initial camera-to-world T
chiral::CameraConfig head;
head.name = "head_cam";
head.height = 720;
head.width = 1280;
head.channels = 3;
head.has_depth = false; // no depth buffer for this camera
head.intrinsics = Eigen::Matrix3d::Identity();
head.extrinsics = Eigen::Matrix4d::Identity();
return {wrist, head};
}
CameraConfig Fields
| Field | Type | Default | Description |
|---|---|---|---|
name |
str / string |
— | Unique camera identifier. Used as the key in obs["name"]. |
height |
int |
— | Image height in pixels. |
width |
int |
— | Image width in pixels. |
channels |
int |
3 | Number of color channels (3 for RGB, 1 for grayscale). |
has_depth |
bool |
False |
Whether to allocate and stream a depth buffer. |
intrinsics |
ndarray(3,3) / Matrix3d |
identity | Initial camera matrix K. Updated per-frame if the lens zooms. |
extrinsics |
ndarray(4,4) / Matrix4d |
identity | Initial camera-to-world transform T. Updated per-frame for moving cameras. |
image_dtype |
np.dtype |
np.uint8 |
Pixel element dtype. (Python only; C++ always uses uint8_t.) |
depth_dtype |
np.dtype |
np.float32 |
Depth element dtype. (Python only; C++ always uses float / float32.) |
Intrinsics and extrinsics are initial values
The values you pass in CameraConfig are used to initialize the internal buffers. They are not locked in — call update_intrinsics or update_extrinsics from your sensor thread each frame to keep them current. Both are included in every observation sent to the client.
Step 2 — Declare Proprioception (Optional)
If your robot reads joint positions, velocities, torques, or any other 1-D float32 stream, add ProprioConfig entries. The base class pre-allocates a separate buffer and a separate mutex for each stream.
def proprio_configs(self) -> list[chiral.ProprioConfig]:
return [
chiral.ProprioConfig(name="joint_pos", size=7), # 7-DOF arm positions (rad)
chiral.ProprioConfig(name="joint_vel", size=7), # 7-DOF arm velocities (rad/s)
chiral.ProprioConfig(name="ee_pose", size=7), # end-effector pose (x,y,z,qx,qy,qz,qw)
]
ProprioConfig Fields
| Field | Type | Description |
|---|---|---|
name |
str / string |
Unique stream identifier. Used as the key in obs.proprios["name"] (Python) or obs.proprio("name") (C++). |
size |
int |
Number of float32 elements in the vector. |
dtype |
np.dtype |
Element dtype. Default np.float32. (Python only.) |
If proprio_configs() returns an empty list (the default), no proprio buffers are allocated and obs.proprios will be empty on the client.
Step 3 — Start Sensor Threads
After calling super().__init__() (Python) or after the base class constructor runs (C++), all buffers and locks are ready. This is the right moment to launch sensor threads.
import threading
class MyServer(chiral.PolicyServer):
def camera_configs(self):
... # (as above)
def proprio_configs(self):
... # (as above)
def __init__(self):
# Must call super().__init__() first — it calls camera_configs() and
# proprio_configs() and pre-allocates self.images, self.depths,
# self.intrinsics, self.extrinsics, self.proprios, and all locks.
super().__init__(host="0.0.0.0", port=8765)
self._running = True
# Launch one capture thread per camera.
# self.images is a dict keyed by camera name, so iterating its keys
# gives exactly the names declared in camera_configs().
for name in self.images:
threading.Thread(
target=self._camera_loop,
args=(name,),
daemon=True, # thread dies when the main process exits
).start()
# Launch one update thread per proprio stream.
for name in self.proprios:
threading.Thread(
target=self._proprio_loop,
args=(name,),
daemon=True,
).start()
#include <atomic>
#include <thread>
class MyServer : public chiral::PolicyServer {
std::atomic<bool> running_{true};
public:
MyServer(const std::string& host, int port)
: PolicyServer(make_cam_configs(), make_proprio_configs(), host, port)
{
// configs_ and proprio_configs_ are populated by the base class
// constructor, so we can iterate them here safely.
// Launch one capture thread per camera (indexed by position).
for (std::size_t i = 0; i < configs_.size(); ++i)
std::thread(&MyServer::camera_loop, this, i).detach();
// Launch one update thread per proprio stream.
for (std::size_t i = 0; i < proprio_configs_.size(); ++i)
std::thread(&MyServer::proprio_loop, this, i).detach();
}
~MyServer() { running_ = false; }
};
Daemon threads in Python
Mark all sensor threads as daemon=True so they are automatically killed when the main process exits. If you need clean shutdown, set a threading.Event and check it in the loop.
Step 4 — Sensor Threads: Fill Buffers
Each sensor thread runs a tight loop that reads data from hardware and calls the appropriate update_* helper. Every helper acquires the per-camera (or per-proprio) mutex, copies the data into the pre-allocated buffer, and releases the mutex — all in one atomic operation.
Buffer Update Reference
| Helper | Updates | When to call |
|---|---|---|
update_image |
Image pixel buffer | Every camera frame (e.g. 30 Hz) |
update_depth |
Depth map buffer | Every camera frame for depth-capable cameras |
update_intrinsics |
3×3 camera matrix K | When focal length or zoom changes |
update_extrinsics |
4×4 camera-to-world T | Every frame for any camera that moves (wrist, head, etc.) |
update_proprio |
Proprioception float vector | Every state read (e.g. 500 Hz for joint encoders) |
update_image
Writes new pixel data into the camera's image buffer.
def _camera_loop(self, name: str) -> None:
while self._running:
# Read a new frame from the camera hardware.
frame = camera_driver.capture_rgb(name) # np.ndarray (H, W, 3) uint8
# update_image acquires the per-camera lock, copies 'frame' into
# self.images[name] in-place, then releases the lock.
self.update_image(name, frame)
time.sleep(1 / 30)
void camera_loop(std::size_t idx) {
while (running_) {
// Read a new frame from the camera hardware.
// frame must contain exactly H*W*C bytes.
std::vector<uint8_t> frame = camera_driver.capture_rgb(idx);
// update_image acquires cam_mutexes_[idx], memcpy's frame into
// images_[idx], then releases the mutex.
update_image(idx, frame.data(), frame.size());
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
}
update_depth
Writes new depth data. Only valid for cameras declared with has_depth=True.
update_intrinsics
Updates the camera matrix K. Call this when the focal length changes (e.g. zoom lens, autofocus that shifts the principal point). For fixed lenses you can skip this entirely — the initial value from CameraConfig is used.
update_extrinsics
Updates the camera-to-world transform T. Call this every frame for any camera that moves with the robot arm (e.g. a wrist-mounted camera). The canonical pattern is to compute T from a forward kinematics (FK) solver using the current joint positions.
def _camera_loop(self, name: str) -> None:
while self._running:
frame = camera_driver.capture_rgb(name)
depth = camera_driver.capture_depth(name)
# Compute camera-to-world transform from current joint state.
# fk_solver.compute() returns a (4, 4) float64 ndarray.
joint_pos = robot.read_joints()
T = fk_solver.compute(joint_pos, camera_frame=name)
self.update_image(name, frame)
self.update_depth(name, depth)
self.update_extrinsics(name, T) # update last so K and T are consistent
time.sleep(1 / 30)
void camera_loop(std::size_t idx) {
while (running_) {
std::vector<uint8_t> frame = camera_driver.capture_rgb(idx);
std::vector<uint8_t> depth = camera_driver.capture_depth(idx);
// Forward kinematics: T is the camera-to-world homogeneous transform.
Eigen::VectorXf joints = robot.read_joints();
Eigen::Matrix4d T = fk_solver.compute(joints, configs_[idx].name);
update_image(idx, frame.data(), frame.size());
update_depth(idx, depth.data(), depth.size());
update_extrinsics(idx, T); // update last for consistency
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
}
Extrinsics convention
Chiral uses the camera-to-world convention: T maps a point in camera coordinates to world coordinates. This is the same convention used by most robotics FK solvers and NeRF-style representations. If your FK solver returns world-to-camera, take the inverse before passing to update_extrinsics.
update_proprio
Updates a proprioception buffer. The vector length must match the size declared in ProprioConfig.
Step 5 — Implement reset, get_obs, and apply_action
With the sensor threads running, reset(), get_obs(), and apply_action() are straightforward.
_make_obs() / make_obs() acquires each camera's mutex in declaration order, copies the four buffers (image, depth, intrinsics, extrinsics) into a new CameraInfo, releases the mutex, and moves on to the next camera. It then does the same for each proprio stream. The result is an Observation where all buffers represent a single coherent point in time.
import time
async def reset(self) -> tuple[chiral.Observation, dict]:
# Reset any environment state (counters, randomisation, etc.)
self._step_count = 0
# Snapshot the current sensor state.
obs = self._make_obs(timestamp=0.0)
return obs, {}
async def get_obs(self) -> chiral.Observation:
# Called by the client obs stream thread at a fixed Hz.
# Default implementation just calls _make_obs().
# Override to block until a fresh camera frame has arrived, or to
# assemble the observation from non-default sources.
return self._make_obs(timestamp=self._step_count * 0.05)
async def apply_action(self, action: np.ndarray) -> None:
# action is a (D,) float32 ndarray decoded from the client message.
# Execute the action on the robot hardware; no response is sent.
robot.apply_action(action)
self._step_count += 1
Note: The C++ server still uses the legacy coupled
step()API.
std::pair<chiral::Observation, chiral::InfoMap> reset() override {
step_ = 0;
return {make_obs(0.0), {}};
}
chiral::StepResult step(const chiral::Action& action) override {
robot.apply_action(action.data);
chiral::StepResult r;
r.obs = make_obs(step_ * 0.05);
r.reward = 0.f;
r.terminated = ++step_ >= 200;
r.truncated = false;
return r;
}
Timestamp conventions
The timestamp field is passed through to the client as-is. Use time.time() / std::chrono for wall-clock time, or a step counter multiplied by your control dt for episode time. The client can access it as obs.timestamp.
Step 6 — Start the Server
For blocking usage (the standard case), call run() from the main thread or from __main__:
For async contexts (e.g. if you already have an asyncio event loop), use await serve() instead:
One client at a time
Chiral is designed for a one-to-one connection between a single server and a single policy client. There is no broadcast or multi-client fanout.
Alternative: Zenoh transport
Pass protocol="zenoh" to use Zenoh over TCP instead of WebSocket. All other code — camera_configs, reset, get_obs, apply_action, the update_* helpers — stays unchanged.
The default port for Zenoh is 7447. The WebSocket default (8765) is unchanged when protocol is omitted.
Exposing Metadata (Optional)
get_metadata() allows the server to expose static information before the episode starts: action shape, camera names, calibration constants, etc. The client calls it once after connecting.
InfoMap is string-only
The C++ InfoMap type is unordered_map<string, string>. Stringify any non-string metadata (numbers, lists) before inserting. The Python dict returned by get_metadata() can hold arbitrary msgpack-compatible values.
Complete Minimal Server
import threading, time
import numpy as np
import chiral
H, W, DOF = 480, 640, 7
class MinimalServer(chiral.PolicyServer):
def camera_configs(self):
return [chiral.CameraConfig(
name="cam", height=H, width=W, channels=3,
has_depth=True,
intrinsics=np.array([[600,0,320],[0,600,240],[0,0,1]], dtype=np.float64),
extrinsics=np.eye(4, dtype=np.float64),
)]
def proprio_configs(self):
return [chiral.ProprioConfig(name="joint_pos", size=DOF)]
def __init__(self):
super().__init__()
self._step = 0
for name in self.images:
threading.Thread(target=self._cam, args=(name,), daemon=True).start()
for name in self.proprios:
threading.Thread(target=self._prop, args=(name,), daemon=True).start()
def _cam(self, name):
while True:
frame = np.zeros((H, W, 3), dtype=np.uint8) # replace with hardware read
depth = np.zeros((H, W), dtype=np.float32) # replace with hardware read
T = np.eye(4, dtype=np.float64) # replace with FK
self.update_image(name, frame)
self.update_depth(name, depth)
self.update_extrinsics(name, T)
time.sleep(1 / 30)
def _prop(self, name):
while True:
q = np.zeros(DOF, dtype=np.float32) # replace with robot.read_joints()
self.update_proprio(name, q)
time.sleep(1 / 500)
async def reset(self):
self._step = 0
return self._make_obs(), {}
async def get_obs(self):
return self._make_obs(timestamp=self._step * 0.05)
async def apply_action(self, action):
# robot.send_joint_command(action)
self._step += 1
if __name__ == "__main__":
MinimalServer().run()
#include <chiral/server.hpp>
#include <Eigen/Dense>
#include <atomic>
#include <thread>
static constexpr int H = 480, W = 640, DOF = 7;
class MinimalServer : public chiral::PolicyServer {
int step_{0};
std::atomic<bool> running_{true};
public:
MinimalServer()
: PolicyServer(
[]{
chiral::CameraConfig c;
c.name = "cam"; c.height = H; c.width = W;
c.channels = 3; c.has_depth = true;
c.intrinsics << 600, 0, 320, 0, 600, 240, 0, 0, 1;
c.extrinsics = Eigen::Matrix4d::Identity();
return std::vector<chiral::CameraConfig>{c};
}(),
{{"joint_pos", DOF}},
"0.0.0.0", 8765)
{
for (std::size_t i = 0; i < configs_.size(); ++i)
std::thread(&MinimalServer::cam_loop, this, i).detach();
for (std::size_t i = 0; i < proprio_configs_.size(); ++i)
std::thread(&MinimalServer::prop_loop, this, i).detach();
}
~MinimalServer() { running_ = false; }
std::pair<chiral::Observation, chiral::InfoMap> reset() override {
step_ = 0;
return {make_obs(0.0), {}};
}
chiral::StepResult step(const chiral::Action&) override {
chiral::StepResult r;
r.obs = make_obs(step_ * 0.05);
r.reward = 0.f; r.truncated = false;
r.terminated = ++step_ >= 200;
return r;
}
private:
void cam_loop(std::size_t idx) {
while (running_) {
Eigen::Matrix4d T = Eigen::Matrix4d::Identity(); // replace with FK
update_extrinsics(idx, T);
std::this_thread::sleep_for(std::chrono::milliseconds(33));
}
}
void prop_loop(std::size_t idx) {
Eigen::VectorXf q = Eigen::VectorXf::Zero(proprio_configs_[idx].size);
while (running_) {
update_proprio(idx, q.data(), q.size());
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
};
int main() { MinimalServer().run(); }