Chat Applications
This use case shows how to build a production-style AI chat application entirely on KalamDB: authenticated per-user message storage, async AI reply generation through a topic pipeline, and zero-polling live updates in the browser.
The chat-with-ai example is a fully working reference you can run locally in about 10 minutes.
If you want a minimal docs-only path first (no Keycloak), start with:
Core patterns used
USERtables give each authenticated user a private, isolated data partition- Live subscriptions deliver new rows to the browser instantly over a single WebSocket connection
- Topics + consumer groups handle async AI reply generation without coupling the frontend to AI latency
- Keycloak OIDC tokens are passed directly to kalam-link — no separate session store needed
Architecture
Browser (Next.js) KalamDB Server Keycloak
keycloak-js PKCE → validate JWT (RS256) ← JWKS endpoint
kalam-link WS → │
INSERT message → chat.messages │
│ CDC │
▼ │
chat.ai_processing │
(topic) │
│ consume │
▼ │
AI Processor (Node.js) │
Gemini API call │
INSERT reply │
live subscription ← chat.messages ◀────────┘
(message appears)Flow:
- User sends a message →
INSERT INTO chat.messagesvia kalam-link - KalamDB CDC publishes the row to the
chat.ai_processingtopic - Standalone AI processor service consumes from the topic
- Service generates a reply via Gemini and inserts it as an
assistantrow - Browser subscription fires immediately with the new row — no polling
Authentication with Keycloak OIDC
The example uses Keycloak for browser-based OIDC login (PKCE flow). After the user logs in, kalam-link receives the JWT directly:
// keycloak-js provides the token after login
const client = createClient({
url: 'http://localhost:8080',
auth: Auth.jwt(keycloak.token),
});
await client.connect();KalamDB validates the JWT against Keycloak’s JWKS endpoint. No username/password is stored in the frontend. When a new user logs in for the first time, KalamDB auto-provisions their account if auto_create_users_from_provider = true is set in server.toml.
Required server configuration:
jwt_trusted_issuers = "http://localhost:8081/realms/kalamdb"
auto_create_users_from_provider = trueSchema
CREATE NAMESPACE IF NOT EXISTS chat;
CREATE TABLE chat.conversations (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
title TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000');
CREATE TABLE chat.messages (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
conversation_id BIGINT NOT NULL,
role TEXT NOT NULL, -- 'user' | 'assistant'
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000');
-- Ephemeral typing indicators, auto-expired after 30 seconds
CREATE TABLE chat.typing_events (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
conversation_id BIGINT NOT NULL,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) WITH (TYPE = 'STREAM', TTL_SECONDS = 30);
-- Topic for async AI processing
CREATE TOPIC chat.ai_processing;
ALTER TOPIC chat.ai_processing
ADD SOURCE chat.messages
ON INSERT
WITH (payload = 'full');TYPE = 'USER' partitions data per authenticated user — each user sees only their own conversations and messages. TYPE = 'STREAM' creates an append-only short-lived log, ideal for ephemeral events like typing indicators.
Frontend — live subscription
const unsub = await client.subscribeWithSql(
`SELECT * FROM chat.messages
WHERE conversation_id = ${conversationIdNum}
ORDER BY created_at ASC`,
(event) => {
if (event.type === 'change' && event.change_type === 'insert') {
addMessage(event.rows[0]);
}
},
{ batch_size: 200 }
);A single WebSocket connection handles all table subscriptions for the session. When the AI processor inserts a reply, the server pushes the new row immediately without any polling or server-sent events.
AI processor — background worker
const worker = client.consumer({
topic: 'chat.ai_processing',
group_id: 'ai-processor',
auto_ack: true,
batch_size: 1,
});
await worker.run(async (ctx) => {
const row = ctx.message.value?.row ?? ctx.message.value;
if (row?.role !== 'user') return; // ignore assistant rows
const reply = await generateReply(row.content);
const username = String(ctx.username ?? '');
if (!username) return;
await client.executeAsUser(
'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',
username,
[row.conversation_id, 'assistant', reply]
);
});The worker is a standalone Node.js process running the kalam-link WASM SDK. It consumes with auto_ack: true (fire-and-forget) — production deployments should switch to manual ack with idempotency keys.
Why this fits KalamDB
| Requirement | KalamDB feature |
|---|---|
| Per-user data isolation | TYPE = 'USER' partitioned tables |
| Zero-latency UI updates | Live subscriptions over WebSocket |
| Async AI processing | Topics + consumer groups |
| Ephemeral events (typing) | TYPE = 'STREAM' with TTL |
| SSO / enterprise auth | OIDC JWKS validation, PKCE flow |
| Auto-provisioning | auto_create_users_from_provider |
Running the example
Full step-by-step setup including Keycloak Docker, database setup, Gemini API key configuration, and running all three UI designs:
Quick reference:
# 1. Start KalamDB server
cd backend && cargo run
# 2. Start Keycloak
cd docker/utils && docker-compose up -d keycloak
# 3. Create schema, users, and .env.local
cd examples/chat-with-ai && ./setup.sh
# 4. Set your Gemini API key in .env.local
echo 'GEMINI_API_KEY=your_key_here' >> .env.local
# 5. Install dependencies
npm install
# 6. Start AI processor (separate terminal)
npm run service
# 7. Start Next.js (separate terminal)
npm run dev
# 8. Open a design
open http://localhost:3000/design1Login with kalamdb-user / kalamdb123 or register a new account directly on the Keycloak login page.