Skip to Content
Overview

Rust SDK

The KalamDB Rust SDK is the native async client surface for Rust services, CLIs, and background workers that need SQL, realtime subscriptions, FILE uploads, or topic consume/ack flows.

Status: The Rust SDK is in beta. The source packages are available in the KalamDB repository; crates.io publishing is still planned.

Package Layout

The Rust SDK lives under link/ in the KalamDB source repository.

CrateUse it for
kalam-clientApp-facing client with KalamLinkClient, auth, SQL, WebSocket subscriptions, live rows, FILE uploads, and optional consumer helpers
kalam-consumerWorker-focused crate layered on kalam-client for topic consumers
link-commonShared implementation crate used internally by the public client crates

Install From Source

Local path dependency

Use this while developing against a local checkout of github.com/kalamstack/KalamDB.

[dependencies] kalam-client = { path = "../KalamDB/link/kalam-client", features = ["native-full"] } kalam-consumer = { path = "../KalamDB/link/kalam-consumer" } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } serde_json = "1"

Adjust the path so it points at your local KalamDB checkout.

Git dependency

Use this when your app should pull the SDK directly from GitHub.

[dependencies] kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", features = ["native-full"] } kalam-consumer = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-consumer" } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } serde_json = "1"

Pin a revision for reproducible builds:

kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", rev = "<commit-sha>", features = ["native-full"] }

Cargo Publishing TODOs

  • TODO: Publish kalam-client to crates.io once the public Rust API is frozen for a beta release.
  • TODO: Publish kalam-consumer to crates.io after kalam-client is available as a registry dependency.
  • TODO: Replace the source install snippets with cargo add kalam-client --features native-full and cargo add kalam-consumer after publishing.
  • TODO: Add package badges and docs.rs links when the crates are published.

Connect And Run SQL

KalamLinkClient::builder() requires a base_url. Use AuthProvider::basic_auth(...) for login bootstrap or AuthProvider::jwt_token(...) when you already have a token.

use kalam_client::{AuthProvider, KalamLinkClient, Result}; #[tokio::main] async fn main() -> Result<()> { let client = KalamLinkClient::builder() .base_url("http://localhost:8080") .auth(AuthProvider::basic_auth( "admin".to_string(), "AdminPass123!".to_string(), )) .build()?; let response = client.execute_query("SELECT 1 AS ok", None, None, None).await?; if response.success() { println!("rows: {:?}", response.rows_as_maps()); } Ok(()) }

Authenticate Explicitly

You can log in first, then build a JWT-backed client for SQL and WebSocket work.

use kalam_client::{AuthProvider, KalamLinkClient, Result}; async fn make_client() -> Result<KalamLinkClient> { let bootstrap = KalamLinkClient::builder() .base_url("http://localhost:8080") .build()?; let login = bootstrap.login("admin", "AdminPass123!").await?; KalamLinkClient::builder() .base_url("http://localhost:8080") .auth(AuthProvider::jwt_token(login.access_token)) .build() }

For OAuth, OIDC, or secret-manager backed auth, implement DynamicAuthProvider and pass it with .auth_provider(...). The provider is called on connect and reconnect so it can return a fresh JWT.

Parameterized Queries

Use $1, $2, and so on in SQL, then pass serde_json::Value parameters.

let params = vec![ serde_json::json!("conv_42"), serde_json::json!(false), ]; let response = client .execute_query( "SELECT * FROM app.messages WHERE conversation_id = $1 AND archived = $2", None, Some(params), None, ) .await?; for row in response.rows_as_maps() { println!("message: {:?}", row); }

Namespace-Scoped Queries

Pass a namespace when your SQL uses unqualified table names.

let response = client .execute_query("SELECT * FROM messages LIMIT 20", None, None, Some("chat")) .await?;

FILE Uploads

Enable the native-full feature or at least the file-uploads feature, then reference files with FILE("name") placeholders.

let avatar = tokio::fs::read("./avatar.png").await?; let response = client .execute_with_files( "INSERT INTO app.users (name, avatar) VALUES ($1, FILE(\"avatar\"))", vec![("avatar", "avatar.png", avatar, Some("image/png"))], Some(vec![serde_json::json!("Alice")]), None, ) .await?;

Realtime Subscriptions

Subscriptions use the shared WebSocket connection. With the default lazy connection settings, subscribe(...) connects automatically on the first subscription.

use kalam_client::{ChangeEvent, Result}; async fn watch_messages(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut subscription = client.subscribe("SELECT * FROM app.messages").await?; while let Some(event) = subscription.next().await { match event? { ChangeEvent::Insert { rows, .. } => println!("inserted: {:?}", rows), ChangeEvent::Update { rows, .. } => println!("updated: {:?}", rows), ChangeEvent::Delete { old_rows, .. } => println!("deleted: {:?}", old_rows), other => println!("subscription event: {:?}", other), } } Ok(()) }

Live Row Snapshots

Use live_query_rows(...) when your app wants materialized row snapshots instead of raw insert/update/delete events.

use kalam_client::{LiveRowsEvent, Result}; async fn live_rows(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut rows = client.live_query_rows("SELECT * FROM app.messages").await?; while let Some(event) = rows.next().await { match event? { LiveRowsEvent::Rows { rows, .. } => println!("current rows: {:?}", rows), LiveRowsEvent::Error { code, message, .. } => { eprintln!("live rows error {code}: {message}"); } } } Ok(()) }

Topic Consumers

Enable the consumer feature through native-full, then use client.consumer() for background workers.

use std::time::Duration; use kalam_client::{AutoOffsetReset, Result}; async fn run_worker(client: &kalam_client::KalamLinkClient) -> Result<()> { let mut consumer = client .consumer() .group_id("billing-worker") .topic("orders.events") .auto_offset_reset(AutoOffsetReset::Earliest) .enable_auto_commit(false) .max_poll_records(25) .poll_timeout(Duration::from_secs(10)) .build()?; let records = consumer.poll().await?; for record in &records { println!("offset={} bytes={}", record.offset, record.payload.len()); consumer.mark_processed(record); } let commit = consumer.commit_sync().await?; println!("committed offset {}", commit.acknowledged_offset); consumer.close().await?; Ok(()) }

Common Pitfalls

  • base_url(...) is required on every KalamLinkClient builder.
  • User/password auth is a login bootstrap. The SDK exchanges it for JWT auth before protected SQL, topic, or WebSocket requests.
  • execute_with_files(...) requires the file-uploads feature. The native-full feature includes it.
  • client.consumer() requires the consumer feature. The native-full feature includes it.
  • Long-running services should close subscriptions and consumers with close() during shutdown.

Next

Last updated on