Skip to Content
Realtime Subscriptions

Realtime Subscriptions

The Rust SDK supports live queries over a shared WebSocket connection.

Ownership boundary:

  • link-common owns the shared connection, reconnect loop, replay filtering, and SeqId checkpoint tracking.
  • Your app owns the async loop over live.next() or events.next().

Live SQL must follow the strict supported shape: SELECT ... FROM ... WHERE ....

Do not include ORDER BY or LIMIT in live subscription SQL. Apply ordering or row caps in application code after rows arrive, or use LiveRowsConfig::limit.

Call client.connect().await? before opening subscriptions so they multiplex over one socket (unless you rely on lazy connect on first subscription).

Which API to use

APIUse when
live() / live_with_config()You want the current materialized row list (recommended for UI and most services)
live_events() / live_events_with_config()You need raw ChangeEvent frames for custom reconciliation or debugging

Unlike execute_query, live APIs take the final SQL string directly — there is no separate params argument. Embed constants in the WHERE clause or use separate subscriptions per filter.

Materialized live rows

RUST
use kalam_client::{    LiveRowsConfig, LiveRowsEvent, SubscriptionConfig, SubscriptionOptions,}; client.connect().await?; let mut config = SubscriptionConfig::new(    "inbox",    "SELECT id, room, body FROM app.messages WHERE room = 'main'",);config.options = Some(    SubscriptionOptions::new()        .with_last_rows(20)        .with_batch_size(50),); let mut live = client    .live_with_config(        config,        LiveRowsConfig {            limit: Some(20),            ..LiveRowsConfig::default()        },    )    .await?; while let Some(event) = live.next().await {    match event? {        LiveRowsEvent::Rows { rows, last_seq_id, .. } => {            println!("{} rows, last_seq_id={last_seq_id:?}", rows.len());            for row in rows {                println!("{}", row.get("body").and_then(|v| v.as_text()).unwrap_or(""));            }        }        LiveRowsEvent::Error { code, message, .. } => {            eprintln!("live error {code}: {message}");            break;        }    }} live.close().await?;

Shorthand when defaults are enough:

RUST
let mut live = client.live("SELECT * FROM app.tasks WHERE status = 'open'").await?;

Control knobs

KnobLayerEffect
with_batch_size(n)Server snapshotChunks initial rows from the server
with_last_rows(n)Server rewindHow much history to fetch on connect
LiveRowsConfig { limit: Some(n), .. }Client materializerCaps rows kept in memory after reconciliation
with_from(seq_id)ResumeContinue from a persisted SeqId
RUST
use kalam_client::{SeqId, SubscriptionOptions}; let options = SubscriptionOptions::new()    .with_last_rows(200)    .with_from(SeqId::from(42_i64));

Each LiveRowsEvent::Rows includes last_seq_id for durable checkpoints.

Low-level change events

RUST
use kalam_client::{ChangeEvent, SubscriptionConfig}; let config = SubscriptionConfig::new(    "raw-inbox",    "SELECT * FROM app.messages WHERE room = 'main'",); let mut events = client.live_events_with_config(config).await?; while let Some(change) = events.next().await {    match change? {        ChangeEvent::Insert { rows, .. } => println!("inserted {}", rows.len()),        ChangeEvent::Update { rows, .. } => println!("updated {}", rows.len()),        ChangeEvent::Delete { old_rows, .. } => println!("deleted {}", old_rows.len()),        other => println!("event: {:?}", other),    }} events.close().await?;

Inspect active subscriptions

RUST
let subs = client.subscriptions().await;for info in subs {    println!("{} — {}", info.id, info.query);}

Cancel one explicitly:

RUST
client.cancel_subscription("subscription-id").await?;

Topic consumers (optional)

Topic workers are behind the consumer feature — not a separate crate.

TOML
kalam-client = { version = "0.5", features = ["native-sdk", "consumer"] }
RUST
use std::time::Duration;use kalam_client::AutoOffsetReset; let mut consumer = client    .consumer()    .group_id("billing-workers")    .topic("orders.events")    .auto_offset_reset(AutoOffsetReset::Earliest)    .enable_auto_commit(false)    .poll_timeout(Duration::from_secs(10))    .build()?; let records = consumer.poll().await?;for record in &records {    // process record.payload    consumer.mark_processed(record);}consumer.commit_sync().await?;consumer.close().await?;

Full walkthrough: Topic Consumer example.

Next

Last updated on