Python SDK
The adamo Python package is the full Adamo SDK — pub/sub, queries, liveliness, hardware-encoded video, and recorded-data access for training. Python 3.10+.
Install
Section titled “Install”pip install adamo # core: pub/sub, queries, livelinesspip install 'adamo[video]' # adds iceoryx2 + numpy for video trackspip install 'adamo[data]' # adds pandas for DataFrame loadingpip install 'adamo[ml]' # adds torch + av for PyTorch datasetsThe package builds from a Rust extension via maturin. Pre-built wheels are published for the supported platforms; on others, pip will compile from source (requires Rust toolchain).
Top-Level API
Section titled “Top-Level API”import adamoRe-exports:
| Symbol | From | Description |
|---|---|---|
connect | adamo.operate._config | Open a Session from an API key. |
connect_async | adamo.operate._config | Async variant of connect. |
Session | adamo.operate.session | The pub/sub / query / liveliness session. |
Sample | adamo.operate.session | A received message. |
Publisher | adamo.operate.session | A persistent publisher. |
Subscriber | adamo.operate.session | An iterator over received samples. |
Robot | adamo.session | A participant — combines pub/sub with hardware video. |
Participant | adamo.session | Alias for Robot. |
VideoTrack | adamo._video | Python-fed video track (lazy import). |
data | adamo.data | Recorded-session client (lazy import). |
operate | adamo.operate | The pub/sub submodule (lazy import). |
adamo.JointState, adamo.Joy, adamo.JoystickCommand are exposed via from adamo import … (re-exports of adamo.operate.control).
connect
Section titled “connect”adamo.connect( *, api_key: str | None = None, token: str | None = None, org_id: str | None = None, api_url: str = "https://q14iirks46.execute-api.eu-west-2.amazonaws.com", protocol: str = "quic", mtls: bool = False,) -> SessionOpen an authenticated session. Provide an api_key for live SDK sessions.
Token auth is still used by browser/user flows, but token-based Python live sessions are not wired in the native transport yet.
| Arg | Description |
|---|---|
api_key | An API key starting with ak_. Used for robots and scripts. |
token | A Supabase JWT. Used for user-authenticated sessions. |
org_id | Required only with token if the user belongs to multiple orgs. |
api_url | Override the Adamo API base URL. |
protocol | Transport — "quic" (default), "udp", or "tcp". |
mtls | Use the mTLS QUIC path when routers require client certificates. |
Raises ValueError if both api_key and token are passed, or if neither is. Raises NotImplementedError for token-based native sessions.
adamo.connect_async(*, api_key=..., token=..., org_id=..., api_url=..., protocol="quic", mtls=False) -> SessionAsync variant — does the API config fetch asynchronously, then opens a synchronous session. Returns a Session, not a coroutine of one (the underlying transport is sync).
Session
Section titled “Session”class adamo.operate.SessionEvery key you pass is automatically scoped to your organisation; you write {robot}/{topic} (e.g. my-arm/sensors/imu).
Session.org
Section titled “Session.org”@propertysession.org -> strThe organisation slug resolved from the API key.
Session.put
Section titled “Session.put”session.put( key_expr: str, payload: bytes | str, *, raw: bool = False, priority: Priority = Priority.DATA, express: bool = False, reliable: bool = False,) -> NoneOne-shot publish. payload is encoded as UTF-8 if a str is passed. raw=True skips the org-scope prefix (useful for router-side endpoints).
Session.publisher
Section titled “Session.publisher”session.publisher( key_expr: str, *, raw: bool = False, priority: Priority = Priority.DATA, express: bool = False, reliable: bool = False,) -> PublisherDeclare a persistent Publisher for repeated puts on the same key. The publisher is undeclared on .close() or context exit.
Session.subscribe
Section titled “Session.subscribe”session.subscribe( key_expr: str, *, raw: bool = False, callback: Callable[[Sample], None] | None = None,) -> SubscriberDeclare a subscriber. Without callback, the returned Subscriber is iterable and supports non-blocking try_recv(). With callback, samples are delivered to it on the receive thread; the returned handle is used for lifecycle (close()).
Wildcards in key_expr:
| Pattern | Matches |
|---|---|
my-arm/sensors/** | Everything under sensors/. |
my-arm/sensors/* | One level under sensors/. |
*/sensors/imu | IMU from any robot. |
Session.get
Section titled “Session.get”session.get( key_expr: str, *, raw: bool = False, timeout_ms: int = 5000,) -> list[Sample]One-shot query. Collects every reply that arrives within timeout_ms. Returns an empty list if nothing replies.
Session.alive
Section titled “Session.alive”session.alive(token_key: str) -> LivelinessTokenDeclare this client as alive at {token_key}/alive. The token stays active until you call .close() on it (or the process exits).
Session.live_tokens
Section titled “Session.live_tokens”session.live_tokens(pattern: str = "**/alive") -> list[str]Query currently-live tokens matching pattern. Returns prefix-stripped keys. Default returns every live token in the org.
Session.on_liveliness
Section titled “Session.on_liveliness”session.on_liveliness( pattern: str = "**/alive", *, callback: Callable[[str, bool], None] | None = None, history: bool = True,) -> SubscriberWatch for liveliness changes. The callback fires with (key, is_alive) whenever a token appears or disappears. history=True delivers the current set of live tokens up front so the handler sees them on subscribe.
Session.close
Section titled “Session.close”session.close() -> NoneClose the underlying connection. Also reachable via with adamo.connect(…) as session:.
Sample
Section titled “Sample”class adamo.SampleA received message.
| Attribute | Type | Description |
|---|---|---|
key | str | Topic key with the org scope stripped. |
payload | bytes | Raw payload. |
timestamp | Timestamp | None | Sender timestamp, if the publisher set one. |
__repr__ shows key + payload size.
Publisher
Section titled “Publisher”class adamo.PublisherA persistent publisher tied to a single key.
publisher.put(payload: bytes | str) -> Nonepublisher.close() -> Nonestr payloads are encoded as UTF-8. Supports the context-manager protocol — exit closes (undeclares) the publisher.
Subscriber
Section titled “Subscriber”class adamo.SubscriberAn iterator over received samples.
for sample in subscriber: ...
sample = subscriber.try_recv() # Sample | None — non-blockingsubscriber.close() # undeclares the subscriptionSupports the context-manager protocol.
class adamo.Robot: def __init__( self, api_key: str | None = None, name: str | None = None, relay: str | None = None, protocol: str = "quic", ): ...A participant — combines pub/sub messaging with hardware-encoded video. The same class covers three roles, defined by which methods you call:
- Leader / viewer (no camera): use
publish/subscribe/send/recv. - Follower robot (one or more cameras): use
attach_videofor each camera +subscribefor control. - Perception pipeline (Python frame loop): use
video()to get aVideoTrack, thentrack.send(frame)in a loop.
Participant is exposed as an alias of Robot for the leader / viewer case.
Properties
Section titled “Properties”robot.session -> Session # opens the underlying Session lazilyVideo attachments
Section titled “Video attachments”robot.attach_video( name: str, *, device: str | int | None = None, shm: str | None = None, pipeline: str | None = None, # legacy GStreamer path; deprecated ros: str | None = None, # legacy ROS bridge; deprecated codec: str = "h264", encoder: str | None = None, bitrate_kbps: int = 2000, fps: int = 30, width: int = 1280, height: int = 720, pixel_format: str | None = None, keyframe_distance: float = 2.0, stereo: bool = False,) -> NoneAttach a track driven by the SDK pipeline. Pass exactly one of device= (V4L2 device path or index), shm= (iceoryx2 service name produced by another process), pipeline= (legacy GStreamer string — deprecated), or ros= (legacy ROS topic — deprecated).
robot.video( name: str, *, width: int = 1280, height: int = 720, pixel_format: str = "BGRA", codec: str = "h264", encoder: str | None = None, bitrate_kbps: int = 2000, fps: int = 30, keyframe_distance: float = 2.0, stereo: bool = False, intrinsics: list[float] | None = None, extrinsics: list[float] | None = None, depth_for: str | None = None,) -> VideoTrackCreate a track whose frames you push from Python. Returns a VideoTrack; call .send(frame) with a numpy array (or bytes-like). The encoder pipeline auto-starts on the first send() if you haven’t called robot.run() yourself.
Pub/sub
Section titled “Pub/sub”robot.publish( track: str, *, priority: int = 200, # 0–255; ≥240 maps to REAL_TIME express: bool = False, reliable: bool = False,) -> PublisherDeclare a publisher on {robot.name}/{track}. The numeric priority is mapped to one of 8 transport priority classes (see _to_zenoh_priority).
robot.subscribe( broadcast: str, track: str | list[str], callback: Callable[..., None], *, priority: int = 200,) -> CallableSubscribe to one or more tracks on another participant’s broadcast. broadcast is the participant name (e.g. "xr-operator"); track can include * / ** wildcards or {name} capture groups. Named captures are passed to the callback as keyword arguments matching the parameter names.
Returns the callback unchanged so it works as a decorator too.
@robot.on( broadcast: str, track: str | list[str], *, priority: int = 200, decode: Callable[[bytes], object] | str | None = "json",) -> CallableDecorator. decode controls payload transformation before the handler runs:
"json"(default) —json.loads(payload)."control"—decode_control(payload)(returnsJointState/Joy/JoystickCommand/ dict).None— pass the rawbytes.- Any callable — called with raw bytes; its return value is passed in.
Messaging (back-channel)
Section titled “Messaging (back-channel)”robot.send(channel: str, data: bytes | str) -> NonePublish a message on {robot.name}/msg/{channel}. str data is encoded as UTF-8.
robot.recv(timeout: float | None = None) -> tuple[str, bytes]Block until a message arrives on any msg/* channel from another participant. Returns (channel, data). Raises TimeoutError if timeout elapses.
robot.on_message(callback: Callable[[str, bytes], None]) -> CallableRegister a callback for incoming messages. Can be used as a decorator. Multiple callbacks can be registered.
Logging
Section titled “Logging”robot.log(message: str, *, level: str = "info") -> NonePublish a log line from this robot. The web operator console subscribes to this stream and renders the log lines in real time.
level is a free-form string; the standard values "info", "warn", "error", "debug" are rendered with colour by the built-in UI. Messages are truncated at 10,000 characters.
import adamo
robot = adamo.Robot(api_key="ak_...", name="my-robot")robot.log("booted and attached camera")robot.log("encoder dropped a frame", level="warn")try: do_stuff()except Exception as e: robot.log(f"pipeline failed: {e}", level="error")Each log entry is stamped with the fabric clock so lines from multiple robots stay ordered in the operator view.
Lifecycle
Section titled “Lifecycle”robot.run() -> NoneBlock until the session ends. If video tracks are attached, drives the encoder pipeline. Otherwise sleeps until Ctrl+C.
robot.close() -> NoneTear down all subscribers, publishers, video tracks, and the session. Supports the context-manager protocol.
VideoTrack
Section titled “VideoTrack”from adamo import VideoTrackReturned from robot.video(). Push frames to feed the encoder.
| Attribute / method | Type | Description |
|---|---|---|
name | str | Track name. |
width, height | int | Frame dimensions. |
pixel_format | str | Uppercased format string. |
service_name | str | iceoryx2 service backing the track. |
send(frame) | None | Push one frame (numpy array or bytes-like). Length must match width × height × bytes-per-pixel for the format. |
close() | None | Mark the track closed; subsequent send() raises RuntimeError. |
Supported pixel_format values:
| Format | Bytes per pixel |
|---|---|
BGRA, RGBA, BGRX, RGBX | 4 |
RGB, BGR | 3 |
YUY2, UYVY | 2 |
I420, NV12 | 1.5 |
Non-contiguous numpy arrays are copied to contiguous before sending.
Control message types
Section titled “Control message types”from adamo.operate.control import JointState, Joy, JoystickCommand, decode_controlJSON-serialisable dataclasses for the most common control payloads. Each has a .to_json() -> bytes method that fills stamp with time.time() if it’s zero.
@dataclassclass JointState: names: list[str] = [] positions: list[float] = [] velocity: list[float] = [] effort: list[float] = [] stamp: float = 0.0 frame_id: str = ""@dataclassclass Joy: axes: list[float] = [] buttons: list[int] = [] stamp: float = 0.0@dataclassclass JoystickCommand: sequence_id: int = 0 axes: list[float] = [] buttons: list[int] = [] stamp: float = 0.0decode_control(payload: bytes) -> JointState | Joy | JoystickCommand | dictDecodes a JSON payload by inspecting its type field. Unknown types come back as a raw dict.
Priority mapping
Section titled “Priority mapping”Robot.publish(priority=…) accepts a 0–255 integer that’s mapped to the 8-class priority enum:
| Range | Class |
|---|---|
| 240–255 | REAL_TIME |
| 210–239 | INTERACTIVE_HIGH |
| 180–209 | INTERACTIVE_LOW |
| 140–179 | DATA_HIGH |
| 100–139 | DATA |
| 60–99 | DATA_LOW |
| 0–59 | BACKGROUND |
Higher numeric values drain ahead of lower ones under congestion.
Downloading Data
Section titled “Downloading Data”The adamo.data submodule connects to the recorded-data store rather than the live network. Use it to download recorded sessions, browse topics, decode video, build temporally-aligned tables, and produce PyTorch datasets for training.
This module is Python only — there is no equivalent in the Rust or C SDKs.
from adamo.data import connect, DataClient, AdamoDataset, SessionMetadata, Record, VideoIndex, Frameconnect
Section titled “connect”adamo.data.connect( *, api_key: str, api_url: str = "...", store_url: str = "https://store.adamohq.com",) -> DataClientOpen a DataClient. Exchanges the API key for a short-lived JWT internally and refreshes when it expires.
DataClient
Section titled “DataClient”class adamo.data.DataClientSessions
Section titled “Sessions”client.list_sessions( *, after: str | float | datetime | None = None, before: str | float | datetime | None = None, name_contains: str | None = None,) -> list[SessionMetadata]
client.get_session(session_id: str) -> SessionMetadataclient.get_topics(session_id: str) -> list[str]client.match_topics(session_id: str, pattern: str) -> list[str]client.message_count(session_id: str) -> intpattern in match_topics supports * (single segment) and ** (any depth).
Records
Section titled “Records”client.query_records( session_id: str, *topics: str, start: str | float | datetime | None = None, end: str | float | datetime | None = None,) -> list[Record]
client.iter_records(...) -> Iterator[Record] # streamingclient.export_records(..., chunk_size: int = 1000) -> Iterator[list[Record]]*topics supports wildcards. Without arguments, returns every topic in the session.
Aligned queries
Section titled “Aligned queries”client.aligned( session_id: str, *topics: str, start: ... = None, end: ... = None, window_ms: int = 50, hz: float | None = None,) -> list[dict[str, Record]]Temporally aligns records across two or more topic patterns. Returns one dict per timestep, mapping topic name to the matched Record. If hz is set, resamples to that frequency; otherwise aligns to the first topic’s timestamps.
client.video_index(session_id: str, topic: str) -> VideoIndexclient.download_video(session_id: str, topic: str, output_path: str | Path) -> Pathclient.iter_frames( session_id: str, topic: str, *, start: ... = None, end: ... = None, size: tuple[int, int] | None = None,) -> Iterator[Frame]iter_frames requires pip install av numpy. Each Frame.image is a (H, W, 3) uint8 RGB numpy array; size=(w, h) resizes.
Episodes
Section titled “Episodes”client.episodes( *topics: str, sessions: list[str] | None = None, window_ms: int = 50,) -> Iterator[dict[str, list[Record]]]Iterate sessions as trajectory-level episodes. Each yielded dict maps topic name to a time-ordered list of Records.
DataFrame
Section titled “DataFrame”client.to_dataframe( session_id: str, *topics: str, start: ... = None, end: ... = None,)Returns a pandas DataFrame with columns session_id, topic, timestamp, payload. Requires pip install adamo[data].
Dataset
Section titled “Dataset”client.dataset( sessions: list, observation: dict[str, str | tuple[str, str]], action: str | tuple[str, str], obs_steps: int = 1, action_steps: int = 1, hz: float = 30.0, image_size: tuple[int, int] | None = None,) -> AdamoDatasetBuild a PyTorch-compatible dataset. observation maps user keys to topic specs:
"topic/path"— auto-detects video vs. raw bytes.("topic/pattern", "field_name")— JSON-decode payload, extract that field as a float array.
action takes the same form. Wildcards resolve automatically; if a pattern matches multiple topics the first is used and a warning is emitted.
Lifecycle
Section titled “Lifecycle”client.close() -> NoneSupports the context-manager protocol.
AdamoDataset
Section titled “AdamoDataset”class adamo.data.AdamoDatasetA PyTorch-compatible dataset. All download / decode happens at construction; __getitem__ is O(1).
dataset[i] -> dict[str, torch.Tensor]Each sample has dot-separated keys:
"observation.<key>"for each entry inobservation, shape(obs_steps, …)."action", shape(action_steps, …).
Video observations are float32 in [0, 1] with shape (obs_steps, 3, H, W). Numeric observations are (obs_steps, D).
dataset.stats: dict[str, dict[str, np.ndarray]]# dataset.stats["observation.state"] = {"mean": …, "std": …}Per-key mean and std for all non-video keys (plus the action key).
Data models
Section titled “Data models”@dataclassclass SessionMetadata: id: str name: str status: str topics: list[str] started_at: float | None # epoch seconds stopped_at: float | None message_count: int org_id: str@dataclassclass Record: session_id: str topic: str payload: bytes timestamp: float # epoch seconds@dataclassclass VideoIndex: session_id: str topic: str frame_count: int keyframe_count: int duration_ms: int avg_fps: float segments: list[dict]@dataclassclass Frame: topic: str timestamp: float image: object # (H, W, 3) uint8 RGB numpy array
@property def height(self) -> int @property def width(self) -> intConcurrency model
Section titled “Concurrency model”- Session methods are synchronous.
connect_asyncis the only async surface and only the API config fetch is async — the returnedSessionis the same sync object asconnect. - Subscribe callbacks run on a background receive thread. Keep them short. For heavy work, push to a queue and process from your main thread.
- The video pipeline runs in its own threads.
Robot.run()blocks the calling thread driving it;track.send()is thread-safe. Robot.recv()blocks the calling thread until amsg/*message arrives.
Errors
Section titled “Errors”The SDK raises ValueError for bad arguments (mutually exclusive options, malformed specs), RuntimeError for state errors (closed track / session without API key), TimeoutError from Robot.recv(timeout=…), and ImportError from optional features when their extras aren’t installed.
The transport layer raises its own exceptions for connection / publish / subscribe failures; those propagate as-is.