Rust SDK
The KalamDB Rust SDK is the native async client surface for Rust services, CLIs, and background workers that need SQL, realtime subscriptions, FILE uploads, or topic consume/ack flows.
Status: The Rust SDK is in beta. The source packages are available in the KalamDB repository; crates.io publishing is still planned.
Package Layout
The Rust SDK lives under link/ in the KalamDB source repository.
| Crate | Use it for |
|---|---|
kalam-client | App-facing client with KalamLinkClient, auth, SQL, WebSocket subscriptions, live rows, FILE uploads, and optional consumer helpers |
kalam-consumer | Worker-focused crate layered on kalam-client for topic consumers |
link-common | Shared implementation crate used internally by the public client crates |
Install From Source
Local path dependency
Use this while developing against a local checkout of github.com/kalamstack/KalamDB.
[dependencies]kalam-client = { path = "../KalamDB/link/kalam-client", features = ["native-full"] }kalam-consumer = { path = "../KalamDB/link/kalam-consumer" }tokio = { version = "1", features = ["macros", "rt-multi-thread"] }serde_json = "1"Adjust the path so it points at your local KalamDB checkout.
Git dependency
Use this when your app should pull the SDK directly from GitHub.
[dependencies]kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", features = ["native-full"] }kalam-consumer = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-consumer" }tokio = { version = "1", features = ["macros", "rt-multi-thread"] }serde_json = "1"Pin a revision for reproducible builds:
kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", rev = "<commit-sha>", features = ["native-full"] }Cargo Publishing TODOs
- TODO: Publish
kalam-clientto crates.io once the public Rust API is frozen for a beta release. - TODO: Publish
kalam-consumerto crates.io afterkalam-clientis available as a registry dependency. - TODO: Replace the source install snippets with
cargo add kalam-client --features native-fullandcargo add kalam-consumerafter publishing. - TODO: Add package badges and docs.rs links when the crates are published.
Connect And Run SQL
KalamLinkClient::builder() requires a base_url. Use AuthProvider::basic_auth(...) for login bootstrap or AuthProvider::jwt_token(...) when you already have a token.
use kalam_client::{AuthProvider, KalamLinkClient, Result}; #[tokio::main]async fn main() -> Result<()> { let client = KalamLinkClient::builder() .base_url("http://localhost:2900") .auth(AuthProvider::basic_auth( "admin".to_string(), "AdminPass123!".to_string(), )) .build()?; let response = client.execute_query("SELECT 1 AS ok", None, None, None).await?; if response.success() { println!("rows: {:?}", response.rows_as_maps()); } Ok(())}Authenticate Explicitly
You can log in first, then build a JWT-backed client for SQL and WebSocket work.
use kalam_client::{AuthProvider, KalamLinkClient, Result}; async fn make_client() -> Result<KalamLinkClient> { let bootstrap = KalamLinkClient::builder() .base_url("http://localhost:2900") .build()?; let login = bootstrap.login("admin", "AdminPass123!").await?; KalamLinkClient::builder() .base_url("http://localhost:2900") .auth(AuthProvider::jwt_token(login.access_token)) .build()}For OAuth, OIDC, or secret-manager backed auth, implement DynamicAuthProvider and pass it with .auth_provider(...). The provider is called on connect and reconnect so it can return a fresh JWT.
Parameterized Queries
Use $1, $2, and so on in SQL, then pass serde_json::Value parameters.
let params = vec![ serde_json::json!("conv_42"), serde_json::json!(false),]; let response = client .execute_query( "SELECT * FROM app.messages WHERE conversation_id = $1 AND archived = $2", None, Some(params), None, ) .await?; for row in response.rows_as_maps() { println!("message: {:?}", row);}Namespace-Scoped Queries
Pass a namespace when your SQL uses unqualified table names.
let response = client .execute_query("SELECT * FROM messages LIMIT 20", None, None, Some("chat")) .await?;FILE Uploads
Enable the native-full feature or at least the file-uploads feature, then reference files with FILE("name") placeholders.
let avatar = tokio::fs::read("./avatar.png").await?; let response = client .execute_with_files( "INSERT INTO app.users (name, avatar) VALUES ($1, FILE(\"avatar\"))", vec![("avatar", "avatar.png", avatar, Some("image/png"))], Some(vec![serde_json::json!("Alice")]), None, ) .await?;Realtime Subscriptions
Live events use the shared WebSocket connection. With the default lazy connection settings, live_events(...) connects automatically on the first stream.
use kalam_client::{ChangeEvent, Result}; async fn watch_messages(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut subscription = client.live_events("SELECT * FROM app.messages").await?; while let Some(event) = subscription.next().await { match event? { ChangeEvent::Insert { rows, .. } => println!("inserted: {:?}", rows), ChangeEvent::Update { rows, .. } => println!("updated: {:?}", rows), ChangeEvent::Delete { old_rows, .. } => println!("deleted: {:?}", old_rows), other => println!("subscription event: {:?}", other), } } Ok(())}Live Row Snapshots
Use live_query_rows(...) when your app wants materialized row snapshots instead of raw insert/update/delete events.
use kalam_client::{LiveRowsEvent, Result}; async fn live_rows(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut rows = client.live_query_rows("SELECT * FROM app.messages").await?; while let Some(event) = rows.next().await { match event? { LiveRowsEvent::Rows { rows, .. } => println!("current rows: {:?}", rows), LiveRowsEvent::Error { code, message, .. } => { eprintln!("live rows error {code}: {message}"); } } } Ok(())}Topic Consumers
Enable the consumer feature through native-full, then use client.consumer() for background workers.
use std::time::Duration; use kalam_client::{AutoOffsetReset, Result}; async fn run_worker(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut consumer = client .consumer() .group_id("billing-worker") .topic("orders.events") .auto_offset_reset(AutoOffsetReset::Earliest) .enable_auto_commit(false) .max_poll_records(25) .poll_timeout(Duration::from_secs(10)) .build()?; let records = consumer.poll().await?; for record in &records { println!("offset={} bytes={}", record.offset, record.payload.len()); consumer.mark_processed(record); } let commit = consumer.commit_sync().await?; println!("committed offset {}", commit.acknowledged_offset); consumer.close().await?; Ok(())}Common Pitfalls
base_url(...)is required on everyKalamLinkClientbuilder.- User/password auth is a login bootstrap. The SDK exchanges it for JWT auth before protected SQL, topic, or WebSocket requests.
execute_with_files(...)requires thefile-uploadsfeature. Thenative-fullfeature includes it.client.consumer()requires theconsumerfeature. Thenative-fullfeature includes it.- Long-running services should close subscriptions and consumers with
close()during shutdown.