Sending and Receiving Data
Beyond video, every Adamo participant can publish, subscribe, and query arbitrary key expressions. This page covers those primitives: publish, subscribe, one-shot get, and the priority / reliability knobs that decide how a message travels.
For control specifically, see Receiving Control.
Publishing
Section titled “Publishing”One-shot put
Section titled “One-shot put”For occasional values — config, single events, “I’m done” notifications.
import adamosession = adamo.connect(api_key="ak_...")session.put("my-arm/sensors/temperature", b"22.5")use adamo::{PublishOptions, Session};let session = Session::open_default("ak_...")?;session.put("my-arm/sensors/temperature", b"22.5", PublishOptions::default())?;adamo_session_t *sess = adamo_open_default("ak_...");adamo_put(sess, "my-arm/sensors/temperature", (uint8_t*)"22.5", 4, /* priority */ 128, /* express */ 0);Persistent publisher
Section titled “Persistent publisher”For repeated puts on the same key (telemetry, joint state, etc.). Avoids re-resolving the key expression each time and lets you set declare-time options like express and reliability once.
import time, jsonwith session.publisher("my-arm/sensors/imu", express=True) as pub: while True: pub.put(json.dumps({"x": 0.1, "y": 0.0, "z": 9.8}).encode()) time.sleep(0.01)use adamo::{PublisherOptions, Session};let session = Session::open_default("ak_...")?;let pub_ = session.publisher( "my-arm/sensors/imu", PublisherOptions { priority: 128, express: true, reliable: false },)?;loop { pub_.put(b"{\"x\":0.1,\"y\":0.0,\"z\":9.8}")?; std::thread::sleep(std::time::Duration::from_millis(10));}adamo_publisher_t *pub = adamo_publisher( sess, "my-arm/sensors/imu", /* priority */ 128, /* express */ 1, /* reliable */ 0);for (;;) { adamo_publisher_put(pub, (const uint8_t*)payload, len); usleep(10000);}adamo_publisher_free(pub);Subscribing
Section titled “Subscribing”Iterator (blocking)
Section titled “Iterator (blocking)”with session.subscribe("my-arm/sensors/**") as sub: for sample in sub: print(sample.key, len(sample.payload))use std::time::Duration;let sub = session.subscribe("my-arm/sensors/**")?;loop { let sample = sub.recv(Some(Duration::from_secs(5)))?; println!("{}: {} bytes", sample.key, sample.payload.len());}adamo_subscriber_t *sub = adamo_subscribe(sess, "my-arm/sensors/**");for (;;) { adamo_sample_t *s = adamo_sub_recv(sub, /* timeout_ms */ 0); if (!s) break; printf("%s: %zu bytes\n", s->key, s->payload_len); adamo_sample_free(s);}Callback (non-blocking)
Section titled “Callback (non-blocking)”The callback fires on a background receive thread. Keep it short — offload heavy work via a queue.
def on_sample(sample): print(sample.key, sample.payload)
sub = session.subscribe("my-arm/sensors/**", callback=on_sample)let _sub = session.subscribe_with("my-arm/sensors/**", |s| { println!("{}: {} bytes", s.key, s.payload.len());})?;void on_sample(const adamo_sample_t *s, void *user) { printf("%s: %zu bytes\n", s->key, s->payload_len);}adamo_cb_sub_t *sub = adamo_subscribe_cb(sess, "my-arm/sensors/**", on_sample, NULL);Wildcards
Section titled “Wildcards”Two wildcards are supported in key expressions:
| Wildcard | Meaning |
|---|---|
* | One path segment (no /). |
** | Zero or more path segments (including /). |
my-arm/sensors/* — one level under sensors/my-arm/sensors/** — everything under sensors/*/sensors/imu — IMU from any robot in the org**/control/joy — joy commands from anywhereOne-Shot Queries
Section titled “One-Shot Queries”A query is a request/reply on a key — useful for “what’s the current value?” against persistent stores or for asking a robot to introspect itself.
for sample in session.get("my-arm/config/**", timeout_ms=2000): print(sample.key, sample.payload)use std::time::Duration;let replies = session.get("my-arm/config/**", Duration::from_secs(2))?;for s in replies { println!("{}: {} bytes", s.key, s.payload.len());}size_t count = 0;adamo_sample_t **replies = adamo_get(sess, "my-arm/config/**", 2000, &count);for (size_t i = 0; i < count; i++) { printf("%s: %zu bytes\n", replies[i]->key, replies[i]->payload_len);}adamo_get_replies_free(replies, count);Priority and Reliability
Section titled “Priority and Reliability”Three knobs control how a message travels. The defaults are tuned for telemetry; control needs different settings.
| Knob | Values | What it controls |
|---|---|---|
| priority | 0–255 (mapped to 8 priority classes) | Which queue the message lives in. Higher drains first. |
| congestion control | DROP (default) or BLOCK | When the queue is full, drop the message vs. block the sender. |
| reliability | BEST_EFFORT (default) or RELIABLE | Retransmit on loss vs. fire-and-forget. |
| express | false (default) or true | Bypass batching for lower latency at the cost of more packets. |
Conventions in this project:
- Control commands → priority
250(REAL_TIME),DROP,BEST_EFFORT,express=true. A late command is worse than no command. - Video → priority
200(INTERACTIVE_HIGH),DROP,BEST_EFFORT. The encoder will recover from loss. - Telemetry → priority
128(DATA, the default),DROP,BEST_EFFORT. Cheap to lose. - Configuration / commands that must arrive → priority
~140,BLOCK,RELIABLE. Use sparingly.
Robotics priority is the opposite of WebRTC — control beats video beats audio.