Skip to content

Python SDK

The adamo Python package is the full Adamo SDK — pub/sub, queries, liveliness, hardware-encoded video, and recorded-data access for training. Python 3.10+.

Terminal window
pip install adamo # core: pub/sub, queries, liveliness
pip install 'adamo[video]' # adds iceoryx2 + numpy for video tracks
pip install 'adamo[data]' # adds pandas for DataFrame loading
pip install 'adamo[ml]' # adds torch + av for PyTorch datasets

The package builds from a Rust extension via maturin. Pre-built wheels are published for the supported platforms; on others, pip will compile from source (requires Rust toolchain).

import adamo

Re-exports:

SymbolFromDescription
connectadamo.operate._configOpen a Session from an API key.
connect_asyncadamo.operate._configAsync variant of connect.
Sessionadamo.operate.sessionThe pub/sub / query / liveliness session.
Sampleadamo.operate.sessionA received message.
Publisheradamo.operate.sessionA persistent publisher.
Subscriberadamo.operate.sessionAn iterator over received samples.
Robotadamo.sessionA participant — combines pub/sub with hardware video.
Participantadamo.sessionAlias for Robot.
VideoTrackadamo._videoPython-fed video track (lazy import).
dataadamo.dataRecorded-session client (lazy import).
operateadamo.operateThe pub/sub submodule (lazy import).

adamo.JointState, adamo.Joy, adamo.JoystickCommand are exposed via from adamo import … (re-exports of adamo.operate.control).


adamo.connect(
*,
api_key: str | None = None,
token: str | None = None,
org_id: str | None = None,
api_url: str = "https://q14iirks46.execute-api.eu-west-2.amazonaws.com",
protocol: str = "quic",
mtls: bool = False,
) -> Session

Open an authenticated session. Provide an api_key for live SDK sessions. Token auth is still used by browser/user flows, but token-based Python live sessions are not wired in the native transport yet.

ArgDescription
api_keyAn API key starting with ak_. Used for robots and scripts.
tokenA Supabase JWT. Used for user-authenticated sessions.
org_idRequired only with token if the user belongs to multiple orgs.
api_urlOverride the Adamo API base URL.
protocolTransport — "quic" (default), "udp", or "tcp".
mtlsUse the mTLS QUIC path when routers require client certificates.

Raises ValueError if both api_key and token are passed, or if neither is. Raises NotImplementedError for token-based native sessions.

adamo.connect_async(*, api_key=..., token=..., org_id=..., api_url=..., protocol="quic", mtls=False) -> Session

Async variant — does the API config fetch asynchronously, then opens a synchronous session. Returns a Session, not a coroutine of one (the underlying transport is sync).


class adamo.operate.Session

Every key you pass is automatically scoped to your organisation; you write {robot}/{topic} (e.g. my-arm/sensors/imu).

@property
session.org -> str

The organisation slug resolved from the API key.

session.put(
key_expr: str,
payload: bytes | str,
*,
raw: bool = False,
priority: Priority = Priority.DATA,
express: bool = False,
reliable: bool = False,
) -> None

One-shot publish. payload is encoded as UTF-8 if a str is passed. raw=True skips the org-scope prefix (useful for router-side endpoints).

session.publisher(
key_expr: str,
*,
raw: bool = False,
priority: Priority = Priority.DATA,
express: bool = False,
reliable: bool = False,
) -> Publisher

Declare a persistent Publisher for repeated puts on the same key. The publisher is undeclared on .close() or context exit.

session.subscribe(
key_expr: str,
*,
raw: bool = False,
callback: Callable[[Sample], None] | None = None,
) -> Subscriber

Declare a subscriber. Without callback, the returned Subscriber is iterable and supports non-blocking try_recv(). With callback, samples are delivered to it on the receive thread; the returned handle is used for lifecycle (close()).

Wildcards in key_expr:

PatternMatches
my-arm/sensors/**Everything under sensors/.
my-arm/sensors/*One level under sensors/.
*/sensors/imuIMU from any robot.
session.get(
key_expr: str,
*,
raw: bool = False,
timeout_ms: int = 5000,
) -> list[Sample]

One-shot query. Collects every reply that arrives within timeout_ms. Returns an empty list if nothing replies.

session.alive(token_key: str) -> LivelinessToken

Declare this client as alive at {token_key}/alive. The token stays active until you call .close() on it (or the process exits).

session.live_tokens(pattern: str = "**/alive") -> list[str]

Query currently-live tokens matching pattern. Returns prefix-stripped keys. Default returns every live token in the org.

session.on_liveliness(
pattern: str = "**/alive",
*,
callback: Callable[[str, bool], None] | None = None,
history: bool = True,
) -> Subscriber

Watch for liveliness changes. The callback fires with (key, is_alive) whenever a token appears or disappears. history=True delivers the current set of live tokens up front so the handler sees them on subscribe.

session.close() -> None

Close the underlying connection. Also reachable via with adamo.connect(…) as session:.


class adamo.Sample

A received message.

AttributeTypeDescription
keystrTopic key with the org scope stripped.
payloadbytesRaw payload.
timestampTimestamp | NoneSender timestamp, if the publisher set one.

__repr__ shows key + payload size.


class adamo.Publisher

A persistent publisher tied to a single key.

publisher.put(payload: bytes | str) -> None
publisher.close() -> None

str payloads are encoded as UTF-8. Supports the context-manager protocol — exit closes (undeclares) the publisher.


class adamo.Subscriber

An iterator over received samples.

for sample in subscriber:
...
sample = subscriber.try_recv() # Sample | None — non-blocking
subscriber.close() # undeclares the subscription

Supports the context-manager protocol.


class adamo.Robot:
def __init__(
self,
api_key: str | None = None,
name: str | None = None,
relay: str | None = None,
protocol: str = "quic",
): ...

A participant — combines pub/sub messaging with hardware-encoded video. The same class covers three roles, defined by which methods you call:

  • Leader / viewer (no camera): use publish / subscribe / send / recv.
  • Follower robot (one or more cameras): use attach_video for each camera + subscribe for control.
  • Perception pipeline (Python frame loop): use video() to get a VideoTrack, then track.send(frame) in a loop.

Participant is exposed as an alias of Robot for the leader / viewer case.

robot.session -> Session # opens the underlying Session lazily
robot.attach_video(
name: str,
*,
device: str | int | None = None,
shm: str | None = None,
pipeline: str | None = None, # legacy GStreamer path; deprecated
ros: str | None = None, # legacy ROS bridge; deprecated
codec: str = "h264",
encoder: str | None = None,
bitrate_kbps: int = 2000,
fps: int = 30,
width: int = 1280,
height: int = 720,
pixel_format: str | None = None,
keyframe_distance: float = 2.0,
stereo: bool = False,
) -> None

Attach a track driven by the SDK pipeline. Pass exactly one of device= (V4L2 device path or index), shm= (iceoryx2 service name produced by another process), pipeline= (legacy GStreamer string — deprecated), or ros= (legacy ROS topic — deprecated).

robot.video(
name: str,
*,
width: int = 1280,
height: int = 720,
pixel_format: str = "BGRA",
codec: str = "h264",
encoder: str | None = None,
bitrate_kbps: int = 2000,
fps: int = 30,
keyframe_distance: float = 2.0,
stereo: bool = False,
intrinsics: list[float] | None = None,
extrinsics: list[float] | None = None,
depth_for: str | None = None,
) -> VideoTrack

Create a track whose frames you push from Python. Returns a VideoTrack; call .send(frame) with a numpy array (or bytes-like). The encoder pipeline auto-starts on the first send() if you haven’t called robot.run() yourself.

robot.publish(
track: str,
*,
priority: int = 200, # 0–255; ≥240 maps to REAL_TIME
express: bool = False,
reliable: bool = False,
) -> Publisher

Declare a publisher on {robot.name}/{track}. The numeric priority is mapped to one of 8 transport priority classes (see _to_zenoh_priority).

robot.subscribe(
broadcast: str,
track: str | list[str],
callback: Callable[..., None],
*,
priority: int = 200,
) -> Callable

Subscribe to one or more tracks on another participant’s broadcast. broadcast is the participant name (e.g. "xr-operator"); track can include * / ** wildcards or {name} capture groups. Named captures are passed to the callback as keyword arguments matching the parameter names.

Returns the callback unchanged so it works as a decorator too.

@robot.on(
broadcast: str,
track: str | list[str],
*,
priority: int = 200,
decode: Callable[[bytes], object] | str | None = "json",
) -> Callable

Decorator. decode controls payload transformation before the handler runs:

  • "json" (default) — json.loads(payload).
  • "control"decode_control(payload) (returns JointState / Joy / JoystickCommand / dict).
  • None — pass the raw bytes.
  • Any callable — called with raw bytes; its return value is passed in.
robot.send(channel: str, data: bytes | str) -> None

Publish a message on {robot.name}/msg/{channel}. str data is encoded as UTF-8.

robot.recv(timeout: float | None = None) -> tuple[str, bytes]

Block until a message arrives on any msg/* channel from another participant. Returns (channel, data). Raises TimeoutError if timeout elapses.

robot.on_message(callback: Callable[[str, bytes], None]) -> Callable

Register a callback for incoming messages. Can be used as a decorator. Multiple callbacks can be registered.

robot.log(message: str, *, level: str = "info") -> None

Publish a log line from this robot. The web operator console subscribes to this stream and renders the log lines in real time.

level is a free-form string; the standard values "info", "warn", "error", "debug" are rendered with colour by the built-in UI. Messages are truncated at 10,000 characters.

import adamo
robot = adamo.Robot(api_key="ak_...", name="my-robot")
robot.log("booted and attached camera")
robot.log("encoder dropped a frame", level="warn")
try:
do_stuff()
except Exception as e:
robot.log(f"pipeline failed: {e}", level="error")

Each log entry is stamped with the fabric clock so lines from multiple robots stay ordered in the operator view.

robot.run() -> None

Block until the session ends. If video tracks are attached, drives the encoder pipeline. Otherwise sleeps until Ctrl+C.

robot.close() -> None

Tear down all subscribers, publishers, video tracks, and the session. Supports the context-manager protocol.


from adamo import VideoTrack

Returned from robot.video(). Push frames to feed the encoder.

Attribute / methodTypeDescription
namestrTrack name.
width, heightintFrame dimensions.
pixel_formatstrUppercased format string.
service_namestriceoryx2 service backing the track.
send(frame)NonePush one frame (numpy array or bytes-like). Length must match width × height × bytes-per-pixel for the format.
close()NoneMark the track closed; subsequent send() raises RuntimeError.

Supported pixel_format values:

FormatBytes per pixel
BGRA, RGBA, BGRX, RGBX4
RGB, BGR3
YUY2, UYVY2
I420, NV121.5

Non-contiguous numpy arrays are copied to contiguous before sending.


from adamo.operate.control import JointState, Joy, JoystickCommand, decode_control

JSON-serialisable dataclasses for the most common control payloads. Each has a .to_json() -> bytes method that fills stamp with time.time() if it’s zero.

@dataclass
class JointState:
names: list[str] = []
positions: list[float] = []
velocity: list[float] = []
effort: list[float] = []
stamp: float = 0.0
frame_id: str = ""
@dataclass
class Joy:
axes: list[float] = []
buttons: list[int] = []
stamp: float = 0.0
@dataclass
class JoystickCommand:
sequence_id: int = 0
axes: list[float] = []
buttons: list[int] = []
stamp: float = 0.0
decode_control(payload: bytes) -> JointState | Joy | JoystickCommand | dict

Decodes a JSON payload by inspecting its type field. Unknown types come back as a raw dict.


Robot.publish(priority=…) accepts a 0–255 integer that’s mapped to the 8-class priority enum:

RangeClass
240–255REAL_TIME
210–239INTERACTIVE_HIGH
180–209INTERACTIVE_LOW
140–179DATA_HIGH
100–139DATA
60–99DATA_LOW
0–59BACKGROUND

Higher numeric values drain ahead of lower ones under congestion.


The adamo.data submodule connects to the recorded-data store rather than the live network. Use it to download recorded sessions, browse topics, decode video, build temporally-aligned tables, and produce PyTorch datasets for training.

This module is Python only — there is no equivalent in the Rust or C SDKs.

from adamo.data import connect, DataClient, AdamoDataset, SessionMetadata, Record, VideoIndex, Frame
adamo.data.connect(
*,
api_key: str,
api_url: str = "...",
store_url: str = "https://store.adamohq.com",
) -> DataClient

Open a DataClient. Exchanges the API key for a short-lived JWT internally and refreshes when it expires.

class adamo.data.DataClient
client.list_sessions(
*,
after: str | float | datetime | None = None,
before: str | float | datetime | None = None,
name_contains: str | None = None,
) -> list[SessionMetadata]
client.get_session(session_id: str) -> SessionMetadata
client.get_topics(session_id: str) -> list[str]
client.match_topics(session_id: str, pattern: str) -> list[str]
client.message_count(session_id: str) -> int

pattern in match_topics supports * (single segment) and ** (any depth).

client.query_records(
session_id: str,
*topics: str,
start: str | float | datetime | None = None,
end: str | float | datetime | None = None,
) -> list[Record]
client.iter_records(...) -> Iterator[Record] # streaming
client.export_records(..., chunk_size: int = 1000) -> Iterator[list[Record]]

*topics supports wildcards. Without arguments, returns every topic in the session.

client.aligned(
session_id: str,
*topics: str,
start: ... = None,
end: ... = None,
window_ms: int = 50,
hz: float | None = None,
) -> list[dict[str, Record]]

Temporally aligns records across two or more topic patterns. Returns one dict per timestep, mapping topic name to the matched Record. If hz is set, resamples to that frequency; otherwise aligns to the first topic’s timestamps.

client.video_index(session_id: str, topic: str) -> VideoIndex
client.download_video(session_id: str, topic: str, output_path: str | Path) -> Path
client.iter_frames(
session_id: str,
topic: str,
*,
start: ... = None,
end: ... = None,
size: tuple[int, int] | None = None,
) -> Iterator[Frame]

iter_frames requires pip install av numpy. Each Frame.image is a (H, W, 3) uint8 RGB numpy array; size=(w, h) resizes.

client.episodes(
*topics: str,
sessions: list[str] | None = None,
window_ms: int = 50,
) -> Iterator[dict[str, list[Record]]]

Iterate sessions as trajectory-level episodes. Each yielded dict maps topic name to a time-ordered list of Records.

client.to_dataframe(
session_id: str,
*topics: str,
start: ... = None,
end: ... = None,
)

Returns a pandas DataFrame with columns session_id, topic, timestamp, payload. Requires pip install adamo[data].

client.dataset(
sessions: list,
observation: dict[str, str | tuple[str, str]],
action: str | tuple[str, str],
obs_steps: int = 1,
action_steps: int = 1,
hz: float = 30.0,
image_size: tuple[int, int] | None = None,
) -> AdamoDataset

Build a PyTorch-compatible dataset. observation maps user keys to topic specs:

  • "topic/path" — auto-detects video vs. raw bytes.
  • ("topic/pattern", "field_name") — JSON-decode payload, extract that field as a float array.

action takes the same form. Wildcards resolve automatically; if a pattern matches multiple topics the first is used and a warning is emitted.

client.close() -> None

Supports the context-manager protocol.

class adamo.data.AdamoDataset

A PyTorch-compatible dataset. All download / decode happens at construction; __getitem__ is O(1).

dataset[i] -> dict[str, torch.Tensor]

Each sample has dot-separated keys:

  • "observation.<key>" for each entry in observation, shape (obs_steps, …).
  • "action", shape (action_steps, …).

Video observations are float32 in [0, 1] with shape (obs_steps, 3, H, W). Numeric observations are (obs_steps, D).

dataset.stats: dict[str, dict[str, np.ndarray]]
# dataset.stats["observation.state"] = {"mean": …, "std": …}

Per-key mean and std for all non-video keys (plus the action key).

@dataclass
class SessionMetadata:
id: str
name: str
status: str
topics: list[str]
started_at: float | None # epoch seconds
stopped_at: float | None
message_count: int
org_id: str
@dataclass
class Record:
session_id: str
topic: str
payload: bytes
timestamp: float # epoch seconds
@dataclass
class VideoIndex:
session_id: str
topic: str
frame_count: int
keyframe_count: int
duration_ms: int
avg_fps: float
segments: list[dict]
@dataclass
class Frame:
topic: str
timestamp: float
image: object # (H, W, 3) uint8 RGB numpy array
@property
def height(self) -> int
@property
def width(self) -> int

  • Session methods are synchronous. connect_async is the only async surface and only the API config fetch is async — the returned Session is the same sync object as connect.
  • Subscribe callbacks run on a background receive thread. Keep them short. For heavy work, push to a queue and process from your main thread.
  • The video pipeline runs in its own threads. Robot.run() blocks the calling thread driving it; track.send() is thread-safe.
  • Robot.recv() blocks the calling thread until a msg/* message arrives.

The SDK raises ValueError for bad arguments (mutually exclusive options, malformed specs), RuntimeError for state errors (closed track / session without API key), TimeoutError from Robot.recv(timeout=…), and ImportError from optional features when their extras aren’t installed.

The transport layer raises its own exceptions for connection / publish / subscribe failures; those propagate as-is.