Chat Applications
This use case shows how to build a production-style AI chat application entirely on KalamDB: authenticated per-user message storage, async AI reply generation through a topic pipeline, and zero-polling live updates in the browser.
The chat-with-ai example is a fully working reference you can run locally in about 10 minutes.
If you want a minimal docs-only path first (no Keycloak), start with:
Core patterns used
USERtables give each authenticated user a private, isolated data partition- Live subscriptions deliver new rows to the browser instantly over a single WebSocket connection
- Topics + consumer groups handle async AI reply generation without coupling the frontend to AI latency
- Keycloak OIDC tokens are passed directly to
@kalamdb/client— no separate session store needed
Architecture
Browser (Next.js) KalamDB Server Keycloak keycloak-js PKCE → validate JWT (RS256) ← JWKS endpoint @kalamdb/client WS → │ INSERT message → chat.messages │ │ CDC │ ▼ │ chat.ai_processing │ (topic) │ │ consume │ ▼ │ AI Processor (Node.js) │ Gemini API call │ INSERT reply │ live subscription ← chat.messages ◀────────┘ (message appears)Flow:
- User sends a message →
INSERT INTO chat.messagesvia@kalamdb/client - KalamDB CDC publishes the row to the
chat.ai_processingtopic - Standalone AI processor service consumes from the topic
- Service generates a reply via Gemini and inserts it as an
assistantrow - Browser subscription fires immediately with the new row — no polling
Authentication with Keycloak OIDC
The example uses Keycloak for browser-based OIDC login (PKCE flow). After the user logs in, @kalamdb/client receives the JWT directly:
// keycloak-js provides the token after loginconst client = createClient({ url: 'http://localhost:2900', authProvider: async () => Auth.jwt(keycloak.token),});The first live(), liveTable(), or liveEvents() call opens the shared socket automatically.
KalamDB validates the JWT against Keycloak’s JWKS endpoint. No username/password is stored in the frontend. When a new user logs in for the first time, KalamDB auto-provisions their account if auto_create_users_from_provider = true is set in server.toml.
Required server configuration:
jwt_trusted_issuers = "http://localhost:2901/realms/kalamdb"auto_create_users_from_provider = trueSchema
CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.conversations ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), title TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, role TEXT NOT NULL, -- 'user' | 'assistant' content TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); -- Ephemeral typing indicators, auto-expired after 30 secondsCREATE TABLE chat.typing_events ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, user_id TEXT NOT NULL, event_type TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'STREAM', TTL_SECONDS = 30); -- Topic for async AI processingCREATE TOPIC chat.ai_processing; ALTER TOPIC chat.ai_processingADD SOURCE chat.messagesON INSERTWITH (payload = 'full');TYPE = 'USER' partitions data per authenticated user — each user sees only their own conversations and messages. TYPE = 'STREAM' creates an append-only short-lived log, ideal for ephemeral events like typing indicators.
Frontend — live subscription
const unsub = await client.live( `SELECT id, conversation_id, role, content, created_at FROM chat.messages WHERE conversation_id = ${conversationIdNum}`, (rows) => { const ordered = [...rows].sort( (left, right) => (left.created_at.asDate()?.getTime() ?? 0) - (right.created_at.asDate()?.getTime() ?? 0), ); setMessages(ordered); }, { subscriptionOptions: { last_rows: 200 }, },);A single WebSocket connection handles all table subscriptions for the session. When the AI processor inserts a reply, the server pushes the new row immediately without polling or server-sent events.
AI processor — background worker
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const workerClient = createConsumerClient({ url: 'http://localhost:2900', authProvider: async () => Auth.basic('ai-processor', 'Secret123!'),}); const worker = workerClient.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor', auto_ack: true, batch_size: 1,}); await worker.run(async (ctx) => { const row = ctx.message.value?.row ?? ctx.message.value; if (row?.role !== 'user') return; // ignore assistant rows const reply = await generateReply(row.content); const user = String(ctx.user ?? ''); if (!user) return; await workerClient.executeAsUser( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', user, [Number(row.conversation_id), 'assistant', reply] );});The worker is a standalone Node.js process running @kalamdb/consumer on top of @kalamdb/client. Production workers should authenticate as a service account so executeAsUser() can place the row in the producer’s partition through KalamDB’s explicit role matrix. It consumes with auto_ack: true (fire-and-forget) — production deployments should switch to manual ack with idempotency keys.
Why this fits KalamDB
| Requirement | KalamDB feature |
|---|---|
| Per-user data isolation | TYPE = 'USER' partitioned tables |
| Zero-latency UI updates | Live subscriptions over WebSocket |
| Async AI processing | Topics + consumer groups |
| Ephemeral events (typing) | TYPE = 'STREAM' with TTL |
| SSO / enterprise auth | OIDC JWKS validation, PKCE flow |
| Auto-provisioning | auto_create_users_from_provider |
Running the example
Full step-by-step setup including Keycloak Docker, database setup, Gemini API key configuration, and running all three UI designs:
Quick reference:
# 1. Start KalamDB servercd backend && cargo run # 2. Start Keycloakcd docker/utils && docker-compose up -d keycloak # 3. Create schema, users, and .env.localcd examples/chat-with-ai && ./setup.sh # 4. Set your Gemini API key in .env.localecho 'GEMINI_API_KEY=your_key_here' >> .env.local # 5. Install dependenciesnpm install # 6. Start AI processor (separate terminal)npm run service # 7. Start Next.js (separate terminal)npm run dev # 8. Open a designopen http://localhost:3000/design1Login with kalamdb-user / kalamdb123 or register a new account directly on the Keycloak login page.