Python SDK
The adamo Python package lets you publish, subscribe, and send control commands over the Adamo network. It works on robots, workstations, and servers — anywhere you can run Python 3.10+.
Install
Section titled “Install”pip install adamoConnect
Section titled “Connect”import adamo
session = adamo.connect(api_key="ak_your_key")The session connects to the Adamo network. All topic keys are automatically prefixed with your organization’s namespace (adamo/{org}/), so you never manage that prefix yourself.
The session is a context manager:
with adamo.connect(api_key="ak_your_key") as session: session.put("my-robot/status", b"ready")Subscribe to Data
Section titled “Subscribe to Data”Blocking iterator
Section titled “Blocking iterator”with session.subscribe("my-robot/sensors/**") as sub: for sample in sub: print(f"{sample.key}: {sample.payload}")Callback (non-blocking)
Section titled “Callback (non-blocking)”def on_data(sample): print(f"{sample.key}: {sample.payload}")
sub = session.subscribe("my-robot/sensors/**", callback=on_data)Call sub.close() when you’re done.
Wildcards
Section titled “Wildcards”| Pattern | Matches |
|---|---|
my-robot/sensors/** | Everything under sensors/ |
my-robot/sensors/* | One level under sensors/ |
*/sensors/imu | Any robot’s IMU |
Sample fields
Section titled “Sample fields”sample.key— the topic key, with the org prefix strippedsample.payload—bytessample.timestamp— Zenoh timestamp, orNone
Publish Data
Section titled “Publish Data”One-shot put
Section titled “One-shot put”session.put("my-robot/sensors/temperature", b"22.5")Streaming publisher
Section titled “Streaming publisher”Use a persistent publisher when putting to the same key repeatedly:
import time
with session.publisher("my-robot/sensors/imu", express=True) as pub: while True: pub.put(b'{"x": 0.1, "y": 0.0, "z": 9.8}') time.sleep(0.01)express=True bypasses batching for lower latency.
Send Control Commands
Section titled “Send Control Commands”control_publisher() returns a publisher pre-configured for real-time control — REAL_TIME priority, drop-on-congestion, best-effort reliability, express mode. A command that arrives 200ms late is worse than no command at all.
Sending JSON
Section titled “Sending JSON”You can publish arbitrary JSON as control commands:
import json
with session.control_publisher("my-robot", "arm_cmd") as pub: pub.put(json.dumps({ "positions": [0.1, 0.5, -0.3, 0.0, 0.8, -0.1], "gripper": 0.5, }).encode())Built-in message types
Section titled “Built-in message types”The SDK includes JointState and Joy types that mirror their ROS equivalents and serialize to JSON:
from adamo import JointState, Joy
# Gamepad-style controljoy = Joy(axes=[0.0, 0.5, 0.0, -0.3], buttons=[0, 0, 1, 0])
with session.control_publisher("my-robot", "joy") as pub: pub.put(joy.to_json())
# Joint position control (GELLO and similar)js = JointState( names=["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"], positions=[0.1, 0.5, -0.3, 0.0, 0.8, -0.1],)
with session.control_publisher("my-robot", "joint_states") as pub: pub.put(js.to_json())Control loop
Section titled “Control loop”For a real GELLO integration, call pub.put() in a tight loop:
import timefrom adamo import JointState
JOINT_NAMES = ["j1", "j2", "j3", "j4", "j5", "j6", "j7"]
def read_joints() -> list[float]: # Replace with your Dynamixel driver read return [0.0] * len(JOINT_NAMES)
with session.control_publisher("my-robot") as pub: while True: js = JointState(names=JOINT_NAMES, positions=read_joints()) pub.put(js.to_json()) time.sleep(0.01) # 100 Hz