Skip to content

Receiving Control

A robot becomes useful when it does something with the input it receives. There are two sources of control:

  1. operate.adamohq.com — gamepad and VR input from the hosted operator, published as CDR-encoded payloads on control/joy, head_pose, and the controller poses.
  2. Your own custom operator — a teleop program you wrote yourself, publishing whatever topics and payload format you like (typically JSON). See Building Your Own Operator.

For both, the SDK primitive is the same: subscribe to a key expression and react to incoming samples.

The web app publishes a Joy message to {robot}/control/joy whenever the gamepad changes. The wire format is CDR.

import adamo
from pycdr2 import CdrReader # pip install pycdr2
robot = adamo.Robot(api_key="ak_...", name="my-arm")
@robot.on("my-arm", "control/joy", decode=None)
def on_joy(payload: bytes):
rdr = CdrReader(payload)
rdr.read_header() # Joy message header
axes = list(rdr.read_float32_array())
buttons = list(rdr.read_int32_array())
# axes[0..1] = left stick X/Y, axes[2..3] = right stick X/Y, etc.
drive(axes[0], axes[1])
robot.run()

The standard W3C / Xbox button and axis mapping is documented in the TypeScript SDK reference.

When a viewer enters immersive VR mode on a stereo track, the headset publishes pose data continuously:

TopicPayloadDescription
{robot}/head_poseCDR-encoded PoseStampedHeadset pose
{robot}/controller/leftCDR-encoded PoseStampedLeft controller
{robot}/controller/rightCDR-encoded PoseStampedRight controller

Subscribe the same way as gamepad.

@robot.on("my-arm", "head_pose", decode=None)
def on_head(payload: bytes):
pose = decode_pose_stamped(payload)
update_head_tracking(pose)

When you write your own operator (XR app, mobile, GELLO leader, …), you choose the topic name and the payload format. JSON is the default in the Python SDK and works across all three languages.

The example below is the robot-side receiver that pairs with the bimanual XR operator on the Building Your Own Operator page.

The @robot.on(...) decorator subscribes and decodes JSON in one step. Brace-wrapped path segments capture the matched value as a keyword argument.

import adamo
robot = adamo.Robot(api_key="ak_...", name="my-arm")
# Cameras streamed back to the operator
robot.attach_video("wrist_left", device="/dev/video0")
robot.attach_video("wrist_right", device="/dev/video1")
robot.attach_video("head", shm="head_cam")
# Bimanual hand controllers — {side} captures "left" or "right"
@robot.on("xr-operator", "control/xr/hand/{side}", priority=250)
def hand(msg, side):
move_arm(side, msg["pos"], msg["quat"])
set_gripper(side, msg["trigger"])
# Head pose — separate handler
@robot.on("xr-operator", "control/xr/head", priority=250)
def on_head(msg):
update_head_tracking(msg["pos"], msg["quat"])
robot.run()

The first argument to @robot.on(...) is the broadcaster name — the operator publishing those topics. The decorator subscribes to {broadcaster}/{track} under the hood.

A bilateral teleop setup sends measured follower joint efforts back to the leader on a real-time control topic. The leader keeps only the newest sample, scales it negatively, then applies it in the leader arm’s external-effort mode.

The example below isolates the feedback path. Replace the two hardware-driver functions with calls to your robot API:

  • follower side: read measured external efforts from the follower arm
  • leader side: apply external efforts to the leader arm

The payload is eight big-endian f64 values: [timestamp_seconds, effort_0, ..., effort_6].

import os
import struct
import sys
import time
import adamo
N_JOINTS = 7
RATE_HZ = 100.0
GAIN = 0.1
ROBOT = os.getenv("ADAMO_ROBOT_NAME", "my-arm")
EFFORT_TOPIC = f"{ROBOT}/control/force_feedback/follower_effort"
def now_seconds(session):
return session.fabric_now_us() / 1_000_000.0
def pack_efforts(timestamp, efforts):
if len(efforts) != N_JOINTS:
raise ValueError(f"expected {N_JOINTS} joint efforts")
return struct.pack("!" + "d" * (1 + N_JOINTS), timestamp, *efforts)
def unpack_efforts(payload):
if len(payload) != 8 * (1 + N_JOINTS):
raise ValueError(f"bad effort payload size: {len(payload)}")
values = struct.unpack("!" + "d" * (1 + N_JOINTS), payload)
return values[0], list(values[1:])
def read_follower_external_efforts():
raise NotImplementedError("read joint efforts from your follower driver")
def apply_leader_external_efforts(efforts):
raise NotImplementedError("send joint efforts to your leader driver")
def run_follower(session):
with session.publisher(
EFFORT_TOPIC,
priority=250,
express=True,
reliable=False,
) as pub:
while True:
efforts = read_follower_external_efforts()
pub.put(pack_efforts(now_seconds(session), efforts))
time.sleep(1.0 / RATE_HZ)
def run_leader(session):
teleop_started_at = now_seconds(session)
with session.subscribe(EFFORT_TOPIC) as sub:
while True:
latest = None
while True:
sample = sub.try_recv()
if sample is None:
break
latest = sample
if latest is not None:
timestamp, measured = unpack_efforts(latest.payload)
if timestamp >= teleop_started_at:
applied = [-GAIN * effort for effort in measured]
apply_leader_external_efforts(applied)
time.sleep(1.0 / RATE_HZ)
session = adamo.connect(
api_key=os.environ["ADAMO_API_KEY"],
)
mode = sys.argv[1] if len(sys.argv) > 1 else "leader"
if mode == "follower":
run_follower(session)
else:
run_leader(session)

Control topics should be published with REAL_TIME priority and dropped on congestion — a command that arrives 200 ms late is worse than no command at all. The robot side doesn’t choose priority on subscribe (the router uses the publisher’s choice), but it’s worth knowing the convention so your own operator programs follow it.

robot.publish("control/joy", priority=250, express=True)
# 0–255 mapped to 8 priority classes; ≥240 is REAL_TIME.

Robots publish video latency and frame-age stats on {robot}/stats/latency. Use them as a safety gate before applying teleop commands.

def on_safety(stats):
if stats.is_stale:
stop_robot()
session.watch_frame_safety("my-arm", on_safety, stale_after_ms=250)
import { watchFrameSafety } from "@adamo/fleet";
await watchFrameSafety(session, org, "my-arm", ({ isStale }) => {
if (isStale) stopRobot();
}, { staleAfterMs: 250 });

For lower-level dashboards, subscribe to video latency stats directly and read frameAgeMs, maxFrameAgeMs, droppedFrames, and channelLag.

See Building Your Own Operator for the publisher side end-to-end.

The current adamo-ts/examples/web frontend publishes operator input as real-time, best-effort messages. The exact key and payload depend on the input mode.

When the UI is using CDR mode, the bytes on the Adamo key are a small ROS envelope followed by the CDR-encoded ROS message:

u32 topic_length_be
utf8 topic
u32 type_length_be
utf8 type
bytes cdr_payload

The topic and type fields describe the inner ROS message. The cdr_payload is the serialized message named by type.

Gamepad input is published on:

adamo/{org}/{robot}/control/joy

By default the payload is a ROS envelope with:

Inner fieldValue
topic/joy
typesensor_msgs/msg/Joy
payloadCDR sensor_msgs/msg/Joy

The Joy message uses header.frame_id = "joy" and contains six axes plus 21 button slots. Standard W3C / Xbox controllers populate buttons 0..16; buttons 17..20 are reserved and normally remain 0.

AxisMeaningRange
axes[0]Left stick X-1 left, +1 right
axes[1]Left stick Y-1 up, +1 down
axes[2]Right stick X-1 left, +1 right
axes[3]Right stick Y-1 up, +1 down
axes[4]Left trigger analog0 released, 1 pressed
axes[5]Right trigger analog0 released, 1 pressed
ButtonXbox / W3C control
buttons[0]A
buttons[1]B
buttons[2]X
buttons[3]Y
buttons[4]LB
buttons[5]RB
buttons[6]LT pressed
buttons[7]RT pressed
buttons[8]Back / Select
buttons[9]Start
buttons[10]Left stick click
buttons[11]Right stick click
buttons[12]D-pad up
buttons[13]D-pad down
buttons[14]D-pad left
buttons[15]D-pad right
buttons[16]Guide / Xbox

If joystick serialization is switched to JSON in the UI, the same key carries:

{
"type": "JoystickCommand",
"sequence_id": 42,
"stamp": 1710000000.123,
"axes": [0, 0, 0, 0, 0, 0],
"buttons": [0, 0, 0]
}

The Logitech Extreme 3D Pro uses the same control/joy key and payload format, but the axes are mapped as a joystick:

AxisMeaning
axes[0]Stick X
axes[1]Stick Y
axes[2]Twist / rudder
axes[3]Throttle, with forward usually negative
axes[4]Unused, normally 0
axes[5]Unused, normally 0

Buttons 0..11 are copied from the physical buttons. Buttons 12..15 are the hat switch when the browser reports it as buttons.

XR tracking is published on one Adamo key:

adamo/{org}/{robot}/control/cdr/xr_tracking

Each publish contains one ROS envelope. Consumers should decode the envelope and dispatch by the inner topic:

Inner topicTypeContents
/head_posegeometry_msgs/msg/PoseStampedHead pose
/controller/{handedness}geometry_msgs/msg/PoseStampedPhysical controller grip pose, or hand wrist pose when hand tracking is active without a physical controller
/controller/{handedness}/joysensor_msgs/msg/JoyXR controller axes and buttons
/hand/{handedness}geometry_msgs/msg/PoseArray25 hand joint poses when hand tracking is enabled

{handedness} is normally left or right. Pose headers use frame_id = "xr_origin". Positions are in meters in the WebXR local-floor reference space. Orientations are published as ROS quaternions {x, y, z, w}; the frontend converts from WebXR’s internal [w, x, y, z] order before encoding the ROS message.

XR controller Joy messages use:

const rawAxisCount = xrGamepad.axes.length;
axes = [
...xrGamepad.axes,
...xrGamepad.buttons.map((button) => button.value),
];
buttons = xrGamepad.buttons.map((button) => button.pressed ? 1 : 0);

The raw axis and button order is the order reported by the WebXR runtime for that controller. The UI does not remap XR controller buttons to Xbox-style button indices.

For controllers using the WebXR xr-standard mapping, the Joy values are:

Joy valueWebXR valueMeaning
axes[0]xrGamepad.axes[0]Primary touchpad X, or placeholder 0
axes[1]xrGamepad.axes[1]Primary touchpad Y, or placeholder 0
axes[2]xrGamepad.axes[2]Primary thumbstick X
axes[3]xrGamepad.axes[3]Primary thumbstick Y
buttons[0]xrGamepad.buttons[0].pressedPrimary trigger pressed
axes[rawAxisCount + 0]xrGamepad.buttons[0].valuePrimary trigger analog value
buttons[1]xrGamepad.buttons[1].pressedGrip / squeeze pressed
axes[rawAxisCount + 1]xrGamepad.buttons[1].valueGrip / squeeze analog value
buttons[2]xrGamepad.buttons[2].pressedPrimary touchpad pressed, if present
axes[rawAxisCount + 2]xrGamepad.buttons[2].valuePrimary touchpad button value
buttons[3]xrGamepad.buttons[3].pressedPrimary thumbstick pressed, if present
axes[rawAxisCount + 3]xrGamepad.buttons[3].valuePrimary thumbstick button value
buttons[4]xrGamepad.buttons[4].pressedFirst extra button. On current Quest/Pico-style controllers this is usually X on left hand and A on right hand.
axes[rawAxisCount + 4]xrGamepad.buttons[4].valueFirst extra button value
buttons[5]xrGamepad.buttons[5].pressedSecond extra button. On current Quest/Pico-style controllers this is usually Y on left hand and B on right hand.
axes[rawAxisCount + 5]xrGamepad.buttons[5].valueSecond extra button value

rawAxisCount is commonly 4 for thumbstick controllers, so the trigger analog value is commonly axes[4], grip is axes[5], thumbstick button value is axes[7], and the first extra face button value is axes[8]. Check axes.length - buttons.length if you want to derive the offset at runtime.

Hand tracking publishes PoseArray joints in this fixed order:

wrist,
thumb-metacarpal, thumb-phalanx-proximal, thumb-phalanx-distal, thumb-tip,
index-finger-metacarpal, index-finger-phalanx-proximal, index-finger-phalanx-intermediate, index-finger-phalanx-distal, index-finger-tip,
middle-finger-metacarpal, middle-finger-phalanx-proximal, middle-finger-phalanx-intermediate, middle-finger-phalanx-distal, middle-finger-tip,
ring-finger-metacarpal, ring-finger-phalanx-proximal, ring-finger-phalanx-intermediate, ring-finger-phalanx-distal, ring-finger-tip,
pinky-finger-metacarpal, pinky-finger-phalanx-proximal, pinky-finger-phalanx-intermediate, pinky-finger-phalanx-distal, pinky-finger-tip