SDK Cookbook
This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.
Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)
This mirrors the chat-with-ai architecture:
- User message inserted into
chat.messages - CDC routes inserts into topic
chat.ai_processing - Background service consumes topic and writes AI reply back to table
- Frontend receives both user and AI rows instantly via live subscription
1) SQL setup
CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, sender TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', content TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW()) WITH (TYPE = 'USER'); CREATE TOPIC chat.ai_processing;ALTER TOPIC chat.ai_processing ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');2) Frontend subscription (React)
const chatSql = ` SELECT * FROM chat.messages WHERE conversation_id = ${conversationId}`; const unsub = await client.live( chatSql, (rows) => { // Let the SDK hand the UI the current materialized conversation state. setMessages(rows); }, { subscriptionOptions: { last_rows: 200 }, },);3) Node.js background processor (consumer)
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const worker = createConsumerClient({ url: process.env.KALAMDB_URL ?? 'http://localhost:2900', authProvider: async () => Auth.basic(process.env.KALAMDB_USER!, process.env.KALAMDB_PASSWORD!),}); const handle = worker.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor-service', auto_ack: true, batch_size: 1,}); await handle.run(async (ctx) => { const payload = ctx.message.value as Record<string, unknown>; const row = (payload.row ?? payload) as { conversation_id: string | number; role: string; content: string; }; if (row.role !== 'user') return; const aiReply = await generateReply(row.content); const user = String(ctx.user ?? ''); if (!user) return; const conversationId = Number(row.conversation_id); if (!Number.isInteger(conversationId)) return; await worker.executeAsUser( 'INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES ($1, $2, $3, $4)', user, [conversationId, 'AI Assistant', 'assistant', aiReply], );});4) Operational notes from source
executeAsUser(...)follows KalamDB’s role matrix; service workers can target service/user accounts, while direct USER-table queries remain scoped to the authenticated account.executeAsUser(...)is valid for USER and STREAM tables; shared tables still rely on their table policy.- Typing indicator updates are done via a stream table (
chat.typing_indicators). - Service loads WASM explicitly in Node contexts where needed.
Recipe 2: Resumable subscriptions with sequence checkpoints
Use from plus the typed lastSeqId checkpoints returned by getSubscriptions().
import { SeqId } from '@kalamdb/client'; let savedSeqId: SeqId | undefined;const eventsSql = "SELECT * FROM app.events WHERE source = 'system'"; const unsub = await client.live( eventsSql, (rows) => { // process the fully reconciled event feed renderEvents(rows); const active = client.getSubscriptions().find((sub) => sub.tableName === eventsSql); if (active?.lastSeqId) { savedSeqId = active.lastSeqId; } }, { subscriptionOptions: { ...(savedSeqId ? { from: savedSeqId } : {}), }, },);Recipe 2.1: Payload parser guard for consumer handlers
When your topic source uses WITH (payload = 'full'), parse both wrapper and direct row forms:
function extractMessageRow(value: unknown) { if (!value || typeof value !== 'object') return null; const record = value as Record<string, unknown>; const source = record.row && typeof record.row === 'object' ? (record.row as Record<string, unknown>) : record; const conversationId = Number(source.conversation_id); if (!Number.isInteger(conversationId)) return null; if (source.role !== 'user' && source.role !== 'assistant') return null; if (typeof source.content !== 'string') return null; return { conversation_id: conversationId, role: source.role, content: source.content, };}Recipe 3: Typed query helpers for safer app code
interface Conversation { id: string; title: string; created_at: string;} const conversations = await client.queryAll( 'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC'); const firstConversation = await client.queryOne( 'SELECT id, title, created_at FROM chat.conversations LIMIT 1'); conversations.forEach((row) => { console.log(row.id.asString(), row.title.asString(), row.created_at.asString());}); console.log(firstConversation?.id.asString());Why this pattern helps:
- keeps SQL in one place
- maps array-row response into typed objects
- reduces unsafe indexing in UI code
Recipe 4: File attachments in messages
const result = await client.queryWithFiles( 'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))', { attachment: selectedFile }, [conversationId, 'user', 'user', messageText], (progress) => { setUploadPercent(progress.percent); });Then parse FILE metadata:
import { parseFileRef } from '@kalamdb/client'; const ref = parseFileRef(row.file_data);if (ref) { const downloadUrl = ref.getDownloadUrl('http://localhost:2900', 'chat', 'messages'); console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription());}Recipe 5: Connection bootstrap for apps
const client = createClient({ url: 'http://localhost:2900', authProvider: async () => Auth.basic(username, password), wasmUrl: '/wasm/kalam_client_bg.wasm',}); await client.login();client.setAutoReconnect(true);client.setReconnectDelay(1000, 30000);client.setMaxReconnectAttempts(0);This mirrors the provider pattern in chat-with-ai: Basic credentials are exchanged for JWT automatically before the realtime connection. Most apps can let the first live/subscription call connect lazily; call connect() only when you want to establish the shared WebSocket eagerly.
Recipe 6: Graceful shutdown for worker services
const handle = worker.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true }); const runPromise = handle.run(async (ctx) => { await processMessage(ctx.message.value);}); process.on('SIGINT', async () => { handle.stop(); await runPromise.catch(() => undefined); await worker.disconnect(); process.exit(0);});Recipe 7: Vector search for semantic retrieval
This matches the new vector-search flow tested in KalamDB core: embeddings live in regular SQL tables, vector indexes are created with DDL, and ranking happens in standard SQL.
1) Schema
CREATE TABLE rag.documents ( id BIGINT PRIMARY KEY, title TEXT NOT NULL, body TEXT NOT NULL) WITH (TYPE = 'USER'); CREATE TABLE rag.documents_vectors ( id BIGINT PRIMARY KEY, doc_embedding EMBEDDING(384)) WITH (TYPE = 'USER'); ALTER TABLE rag.documents_vectors CREATE INDEX doc_embedding USING COSINE;2) Insert or update embeddings
await client.query( `INSERT INTO rag.documents_vectors (id, doc_embedding) VALUES ($1, $2)`, [docId, JSON.stringify(embedding)],);3) Query nearest documents
const matches = await client.queryAll( `SELECT d.id, d.title, d.body FROM rag.documents AS d JOIN rag.documents_vectors AS v ON v.id = d.id ORDER BY COSINE_DISTANCE(v.doc_embedding, $1) LIMIT 5`, [JSON.stringify(queryEmbedding)],);Why this pattern helps:
- keeps semantic retrieval inside the same SQL surface as the rest of your app
- works with
TYPE = 'USER'for tenant-safe agent memory - lets you combine vectors, files, and structured rows without a separate vector service