Skip to content

Python SDK

The adamo Python package lets you publish, subscribe, and send control commands over the Adamo network. It works on robots, workstations, and servers — anywhere you can run Python 3.10+.

pip install adamo
import adamo
session = adamo.connect(api_key="ak_your_key")

The session connects to the Adamo network. All topic keys are automatically prefixed with your organization’s namespace (adamo/{org}/), so you never manage that prefix yourself.

The session is a context manager:

with adamo.connect(api_key="ak_your_key") as session:
session.put("my-robot/status", b"ready")
with session.subscribe("my-robot/sensors/**") as sub:
for sample in sub:
print(f"{sample.key}: {sample.payload}")
def on_data(sample):
print(f"{sample.key}: {sample.payload}")
sub = session.subscribe("my-robot/sensors/**", callback=on_data)

Call sub.close() when you’re done.

PatternMatches
my-robot/sensors/**Everything under sensors/
my-robot/sensors/*One level under sensors/
*/sensors/imuAny robot’s IMU
  • sample.key — the topic key, with the org prefix stripped
  • sample.payloadbytes
  • sample.timestamp — Zenoh timestamp, or None
session.put("my-robot/sensors/temperature", b"22.5")

Use a persistent publisher when putting to the same key repeatedly:

import time
with session.publisher("my-robot/sensors/imu", express=True) as pub:
while True:
pub.put(b'{"x": 0.1, "y": 0.0, "z": 9.8}')
time.sleep(0.01)

express=True bypasses batching for lower latency.

control_publisher() returns a publisher pre-configured for real-time control — REAL_TIME priority, drop-on-congestion, best-effort reliability, express mode. A command that arrives 200ms late is worse than no command at all.

You can publish arbitrary JSON as control commands:

import json
with session.control_publisher("my-robot", "arm_cmd") as pub:
pub.put(json.dumps({
"positions": [0.1, 0.5, -0.3, 0.0, 0.8, -0.1],
"gripper": 0.5,
}).encode())

The SDK includes JointState and Joy types that mirror their ROS equivalents and serialize to JSON:

from adamo import JointState, Joy
# Gamepad-style control
joy = Joy(axes=[0.0, 0.5, 0.0, -0.3], buttons=[0, 0, 1, 0])
with session.control_publisher("my-robot", "joy") as pub:
pub.put(joy.to_json())
# Joint position control (GELLO and similar)
js = JointState(
names=["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"],
positions=[0.1, 0.5, -0.3, 0.0, 0.8, -0.1],
)
with session.control_publisher("my-robot", "joint_states") as pub:
pub.put(js.to_json())

For a real GELLO integration, call pub.put() in a tight loop:

import time
from adamo import JointState
JOINT_NAMES = ["j1", "j2", "j3", "j4", "j5", "j6", "j7"]
def read_joints() -> list[float]:
# Replace with your Dynamixel driver read
return [0.0] * len(JOINT_NAMES)
with session.control_publisher("my-robot") as pub:
while True:
js = JointState(names=JOINT_NAMES, positions=read_joints())
pub.put(js.to_json())
time.sleep(0.01) # 100 Hz