Skip to Content
Chat Applications

Chat Applications

This use case shows how to build a production-style AI chat application entirely on KalamDB: authenticated per-user message storage, async AI reply generation through a topic pipeline, and zero-polling live updates in the browser.

The chat-with-ai example  is a fully working reference you can run locally in about 10 minutes.

If you want a minimal docs-only path first (no Keycloak), start with:

Core patterns used

  • USER tables give each authenticated user a private, isolated data partition
  • Live subscriptions deliver new rows to the browser instantly over a single WebSocket connection
  • Topics + consumer groups handle async AI reply generation without coupling the frontend to AI latency
  • Keycloak OIDC tokens are passed directly to @kalamdb/client — no separate session store needed

Architecture

plaintext snippetplaintext
Browser (Next.js)          KalamDB Server             Keycloak  keycloak-js PKCE    →    validate JWT (RS256)  ←   JWKS endpoint  @kalamdb/client WS  →                          │  INSERT message      →    chat.messages         │                                  │ CDC           │                                  ▼               │                          chat.ai_processing      │                          (topic)                 │                                  │ consume        │                                  ▼               │                           AI Processor (Node.js) │                           Gemini API call        │                           INSERT reply           │  live subscription   ←    chat.messages ◀────────┘  (message appears)

Flow:

  1. User sends a message → INSERT INTO chat.messages via @kalamdb/client
  2. KalamDB CDC publishes the row to the chat.ai_processing topic
  3. Standalone AI processor service consumes from the topic
  4. Service generates a reply via Gemini and inserts it as an assistant row
  5. Browser subscription fires immediately with the new row — no polling

Authentication with Keycloak OIDC

The example uses Keycloak for browser-based OIDC login (PKCE flow). After the user logs in, @kalamdb/client receives the JWT directly:

typescript snippettypescript
// keycloak-js provides the token after loginconst client = createClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.jwt(keycloak.token),});

The first live(), liveTable(), or liveEvents() call opens the shared socket automatically.

KalamDB validates the JWT against Keycloak’s JWKS endpoint. No username/password is stored in the frontend. When a new user logs in for the first time, KalamDB auto-provisions their account if auto_create_users_from_provider = true is set in server.toml.

Required server configuration:

toml snippetTOML
jwt_trusted_issuers = "http://localhost:2901/realms/kalamdb"auto_create_users_from_provider = true

Schema

sql snippetSQL
CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.conversations (  id          BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),  title       TEXT NOT NULL,  created_at  TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); CREATE TABLE chat.messages (  id              BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),  conversation_id BIGINT NOT NULL,  role            TEXT NOT NULL,   -- 'user' | 'assistant'  content         TEXT NOT NULL,  created_at      TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); -- Ephemeral typing indicators, auto-expired after 30 secondsCREATE TABLE chat.typing_events (  id              BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),  conversation_id BIGINT NOT NULL,  user_id         TEXT NOT NULL,  event_type      TEXT NOT NULL,  created_at      TIMESTAMP DEFAULT NOW()) WITH (TYPE = 'STREAM', TTL_SECONDS = 30); -- Topic for async AI processingCREATE TOPIC chat.ai_processing; ALTER TOPIC chat.ai_processingADD SOURCE chat.messagesON INSERTWITH (payload = 'full');

TYPE = 'USER' partitions data per authenticated user — each user sees only their own conversations and messages. TYPE = 'STREAM' creates an append-only short-lived log, ideal for ephemeral events like typing indicators.

Frontend — live subscription

typescript snippettypescript
const unsub = await client.live(  `SELECT id, conversation_id, role, content, created_at   FROM chat.messages   WHERE conversation_id = ${conversationIdNum}`,  (rows) => {    const ordered = [...rows].sort(      (left, right) =>        (left.created_at.asDate()?.getTime() ?? 0) -        (right.created_at.asDate()?.getTime() ?? 0),    );    setMessages(ordered);  },  {    subscriptionOptions: { last_rows: 200 },  },);

A single WebSocket connection handles all table subscriptions for the session. When the AI processor inserts a reply, the server pushes the new row immediately without polling or server-sent events.

AI processor — background worker

typescript snippettypescript
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const workerClient = createConsumerClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.basic('ai-processor', 'Secret123!'),}); const worker = workerClient.consumer({  topic: 'chat.ai_processing',  group_id: 'ai-processor',  auto_ack: true,  batch_size: 1,}); await worker.run(async (ctx) => {  const row = ctx.message.value?.row ?? ctx.message.value;  if (row?.role !== 'user') return;         // ignore assistant rows   const reply = await generateReply(row.content);  const user = String(ctx.user ?? '');  if (!user) return;   await workerClient.executeAsUser(    'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',    user,    [Number(row.conversation_id), 'assistant', reply]  );});

The worker is a standalone Node.js process running @kalamdb/consumer on top of @kalamdb/client. Production workers should authenticate as a service account so executeAsUser() can place the row in the producer’s partition through KalamDB’s explicit role matrix. It consumes with auto_ack: true (fire-and-forget) — production deployments should switch to manual ack with idempotency keys.

Why this fits KalamDB

RequirementKalamDB feature
Per-user data isolationTYPE = 'USER' partitioned tables
Zero-latency UI updatesLive subscriptions over WebSocket
Async AI processingTopics + consumer groups
Ephemeral events (typing)TYPE = 'STREAM' with TTL
SSO / enterprise authOIDC JWKS validation, PKCE flow
Auto-provisioningauto_create_users_from_provider

Running the example

Full step-by-step setup including Keycloak Docker, database setup, Gemini API key configuration, and running all three UI designs:

Quick reference:

bash snippetBASH
# 1. Start KalamDB servercd backend && cargo run # 2. Start Keycloakcd docker/utils && docker-compose up -d keycloak # 3. Create schema, users, and .env.localcd examples/chat-with-ai && ./setup.sh # 4. Set your Gemini API key in .env.localecho 'GEMINI_API_KEY=your_key_here' >> .env.local # 5. Install dependenciesnpm install # 6. Start AI processor (separate terminal)npm run service # 7. Start Next.js (separate terminal)npm run dev # 8. Open a designopen http://localhost:3000/design1

Login with kalamdb-user / kalamdb123 or register a new account directly on the Keycloak login page.

Last updated on