Rust SDK
The KalamDB Rust SDK is the native async client surface for Rust services, CLIs, and background workers that need SQL, realtime subscriptions, FILE uploads, or topic consume/ack flows.
Status: The Rust SDK is in beta. The source packages are available in the KalamDB repository; crates.io publishing is still planned.
Package Layout
The Rust SDK lives under link/ in the KalamDB source repository.
| Crate | Use it for |
|---|---|
kalam-client | App-facing client with KalamLinkClient, auth, SQL, WebSocket subscriptions, live rows, FILE uploads, and optional consumer helpers |
kalam-consumer | Worker-focused crate layered on kalam-client for topic consumers |
link-common | Shared implementation crate used internally by the public client crates |
Install From Source
Local path dependency
Use this while developing against a local checkout of github.com/kalamstack/KalamDB.
[dependencies]
kalam-client = { path = "../KalamDB/link/kalam-client", features = ["native-full"] }
kalam-consumer = { path = "../KalamDB/link/kalam-consumer" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1"Adjust the path so it points at your local KalamDB checkout.
Git dependency
Use this when your app should pull the SDK directly from GitHub.
[dependencies]
kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", features = ["native-full"] }
kalam-consumer = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-consumer" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1"Pin a revision for reproducible builds:
kalam-client = { git = "https://github.com/kalamstack/KalamDB.git", package = "kalam-client", rev = "<commit-sha>", features = ["native-full"] }Cargo Publishing TODOs
- TODO: Publish
kalam-clientto crates.io once the public Rust API is frozen for a beta release. - TODO: Publish
kalam-consumerto crates.io afterkalam-clientis available as a registry dependency. - TODO: Replace the source install snippets with
cargo add kalam-client --features native-fullandcargo add kalam-consumerafter publishing. - TODO: Add package badges and docs.rs links when the crates are published.
Connect And Run SQL
KalamLinkClient::builder() requires a base_url. Use AuthProvider::basic_auth(...) for login bootstrap or AuthProvider::jwt_token(...) when you already have a token.
use kalam_client::{AuthProvider, KalamLinkClient, Result};
#[tokio::main]
async fn main() -> Result<()> {
let client = KalamLinkClient::builder()
.base_url("http://localhost:8080")
.auth(AuthProvider::basic_auth(
"admin".to_string(),
"AdminPass123!".to_string(),
))
.build()?;
let response = client.execute_query("SELECT 1 AS ok", None, None, None).await?;
if response.success() {
println!("rows: {:?}", response.rows_as_maps());
}
Ok(())
}Authenticate Explicitly
You can log in first, then build a JWT-backed client for SQL and WebSocket work.
use kalam_client::{AuthProvider, KalamLinkClient, Result};
async fn make_client() -> Result<KalamLinkClient> {
let bootstrap = KalamLinkClient::builder()
.base_url("http://localhost:8080")
.build()?;
let login = bootstrap.login("admin", "AdminPass123!").await?;
KalamLinkClient::builder()
.base_url("http://localhost:8080")
.auth(AuthProvider::jwt_token(login.access_token))
.build()
}For OAuth, OIDC, or secret-manager backed auth, implement DynamicAuthProvider and pass it with .auth_provider(...). The provider is called on connect and reconnect so it can return a fresh JWT.
Parameterized Queries
Use $1, $2, and so on in SQL, then pass serde_json::Value parameters.
let params = vec![
serde_json::json!("conv_42"),
serde_json::json!(false),
];
let response = client
.execute_query(
"SELECT * FROM app.messages WHERE conversation_id = $1 AND archived = $2",
None,
Some(params),
None,
)
.await?;
for row in response.rows_as_maps() {
println!("message: {:?}", row);
}Namespace-Scoped Queries
Pass a namespace when your SQL uses unqualified table names.
let response = client
.execute_query("SELECT * FROM messages LIMIT 20", None, None, Some("chat"))
.await?;FILE Uploads
Enable the native-full feature or at least the file-uploads feature, then reference files with FILE("name") placeholders.
let avatar = tokio::fs::read("./avatar.png").await?;
let response = client
.execute_with_files(
"INSERT INTO app.users (name, avatar) VALUES ($1, FILE(\"avatar\"))",
vec![("avatar", "avatar.png", avatar, Some("image/png"))],
Some(vec![serde_json::json!("Alice")]),
None,
)
.await?;Realtime Subscriptions
Subscriptions use the shared WebSocket connection. With the default lazy connection settings, subscribe(...) connects automatically on the first subscription.
use kalam_client::{ChangeEvent, Result};
async fn watch_messages(client: &kalam_client::KalamLinkClient) -> Result<()> {
let mut subscription = client.subscribe("SELECT * FROM app.messages").await?;
while let Some(event) = subscription.next().await {
match event? {
ChangeEvent::Insert { rows, .. } => println!("inserted: {:?}", rows),
ChangeEvent::Update { rows, .. } => println!("updated: {:?}", rows),
ChangeEvent::Delete { old_rows, .. } => println!("deleted: {:?}", old_rows),
other => println!("subscription event: {:?}", other),
}
}
Ok(())
}Live Row Snapshots
Use live_query_rows(...) when your app wants materialized row snapshots instead of raw insert/update/delete events.
use kalam_client::{LiveRowsEvent, Result};
async fn live_rows(client: &kalam_client::KalamLinkClient) -> Result<()> {
let mut rows = client.live_query_rows("SELECT * FROM app.messages").await?;
while let Some(event) = rows.next().await {
match event? {
LiveRowsEvent::Rows { rows, .. } => println!("current rows: {:?}", rows),
LiveRowsEvent::Error { code, message, .. } => {
eprintln!("live rows error {code}: {message}");
}
}
}
Ok(())
}Topic Consumers
Enable the consumer feature through native-full, then use client.consumer() for background workers.
use std::time::Duration;
use kalam_client::{AutoOffsetReset, Result};
async fn run_worker(client: &kalam_client::KalamLinkClient) -> Result<()> {
let mut consumer = client
.consumer()
.group_id("billing-worker")
.topic("orders.events")
.auto_offset_reset(AutoOffsetReset::Earliest)
.enable_auto_commit(false)
.max_poll_records(25)
.poll_timeout(Duration::from_secs(10))
.build()?;
let records = consumer.poll().await?;
for record in &records {
println!("offset={} bytes={}", record.offset, record.payload.len());
consumer.mark_processed(record);
}
let commit = consumer.commit_sync().await?;
println!("committed offset {}", commit.acknowledged_offset);
consumer.close().await?;
Ok(())
}Common Pitfalls
base_url(...)is required on everyKalamLinkClientbuilder.- User/password auth is a login bootstrap. The SDK exchanges it for JWT auth before protected SQL, topic, or WebSocket requests.
execute_with_files(...)requires thefile-uploadsfeature. Thenative-fullfeature includes it.client.consumer()requires theconsumerfeature. Thenative-fullfeature includes it.- Long-running services should close subscriptions and consumers with
close()during shutdown.