Skip to Content

SDK Cookbook

This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.

Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)

This mirrors the chat-with-ai architecture:

  1. User message inserted into chat.messages
  2. CDC routes inserts into topic chat.ai_processing
  3. Background service consumes topic and writes AI reply back to table
  4. Frontend receives both user and AI rows instantly via live subscription

1) SQL setup

CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, sender TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', content TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) WITH (TYPE = 'USER'); CREATE TOPIC chat.ai_processing; ALTER TOPIC chat.ai_processing ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');

2) Frontend subscription (React)

const chatSql = ` SELECT * FROM chat.messages WHERE conversation_id = ${conversationId} `; const unsub = await client.live( chatSql, (rows) => { // Let the SDK hand the UI the current materialized conversation state. setMessages(rows); }, { subscriptionOptions: { last_rows: 200 }, }, );

3) Node.js background processor (consumer)

import { Auth } from '@kalamdb/client'; import { createConsumerClient } from '@kalamdb/consumer'; const worker = createConsumerClient({ url: process.env.KALAMDB_URL ?? 'http://localhost:8080', authProvider: async () => Auth.basic(process.env.KALAMDB_USERNAME!, process.env.KALAMDB_PASSWORD!), }); const handle = worker.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor-service', auto_ack: true, batch_size: 1, }); await handle.run(async (ctx) => { const payload = ctx.message.value as Record<string, unknown>; const row = (payload.row ?? payload) as { conversation_id: string | number; role: string; content: string; }; if (row.role !== 'user') return; const aiReply = await generateReply(row.content); const username = String(ctx.username ?? ''); if (!username) return; const conversationId = Number(row.conversation_id); if (!Number.isInteger(conversationId)) return; await worker.executeAsUser( 'INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES ($1, $2, $3, $4)', username, [conversationId, 'AI Assistant', 'assistant', aiReply], ); });

4) Operational notes from source

  • chat-with-ai uses executeAsUser(...) to preserve per-user isolation in service writes.
  • Typing indicator updates are done via a stream table (chat.typing_indicators).
  • Service loads WASM explicitly in Node contexts where needed.

Recipe 2: Resumable subscriptions with sequence checkpoints

Use from plus the typed lastSeqId checkpoints returned by getSubscriptions().

import { SeqId } from '@kalamdb/client'; let savedSeqId: SeqId | undefined; const eventsSql = "SELECT * FROM app.events WHERE source = 'system'"; const unsub = await client.live( eventsSql, (rows) => { // process the fully reconciled event feed renderEvents(rows); const active = client.getSubscriptions().find((sub) => sub.tableName === eventsSql); if (active?.lastSeqId) { savedSeqId = active.lastSeqId; } }, { subscriptionOptions: { ...(savedSeqId ? { from: savedSeqId } : {}), }, }, );

Recipe 2.1: Payload parser guard for consumer handlers

When your topic source uses WITH (payload = 'full'), parse both wrapper and direct row forms:

function extractMessageRow(value: unknown) { if (!value || typeof value !== 'object') return null; const record = value as Record<string, unknown>; const source = record.row && typeof record.row === 'object' ? (record.row as Record<string, unknown>) : record; const conversationId = Number(source.conversation_id); if (!Number.isInteger(conversationId)) return null; if (source.role !== 'user' && source.role !== 'assistant') return null; if (typeof source.content !== 'string') return null; return { conversation_id: conversationId, role: source.role, content: source.content, }; }

Recipe 3: Typed query helpers for safer app code

interface Conversation { id: string; title: string; created_at: string; } const conversations = await client.queryAll( 'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC' ); const firstConversation = await client.queryOne( 'SELECT id, title, created_at FROM chat.conversations LIMIT 1' ); conversations.forEach((row) => { console.log(row.id.asString(), row.title.asString(), row.created_at.asString()); }); console.log(firstConversation?.id.asString());

Why this pattern helps:

  • keeps SQL in one place
  • maps array-row response into typed objects
  • reduces unsafe indexing in UI code

Recipe 4: File attachments in messages

const result = await client.queryWithFiles( 'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))', { attachment: selectedFile }, [conversationId, 'user', 'user', messageText], (progress) => { setUploadPercent(progress.percent); } );

Then parse FILE metadata:

import { parseFileRef } from '@kalamdb/client'; const ref = parseFileRef(row.file_data); if (ref) { const downloadUrl = ref.getDownloadUrl('http://localhost:8080', 'chat', 'messages'); console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription()); }

Recipe 5: Connection bootstrap for apps

const client = createClient({ url: 'http://localhost:8080', authProvider: async () => Auth.basic(username, password), wasmUrl: '/wasm/kalam_client_bg.wasm', }); await client.login(); client.setAutoReconnect(true); client.setReconnectDelay(1000, 30000); client.setMaxReconnectAttempts(0);

This mirrors the provider pattern in chat-with-ai: Basic credentials are exchanged for JWT automatically before the realtime connection. There is no need to call connect() manually.

Recipe 6: Graceful shutdown for worker services

const handle = worker.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true }); const runPromise = handle.run(async (ctx) => { await processMessage(ctx.message.value); }); process.on('SIGINT', async () => { handle.stop(); await runPromise.catch(() => undefined); await worker.disconnect(); process.exit(0); });

Recipe 7: Vector search for semantic retrieval

This matches the new vector-search flow tested in KalamDB core: embeddings live in regular SQL tables, vector indexes are created with DDL, and ranking happens in standard SQL.

1) Schema

CREATE TABLE rag.documents ( id BIGINT PRIMARY KEY, title TEXT NOT NULL, body TEXT NOT NULL ) WITH (TYPE = 'USER'); CREATE TABLE rag.documents_vectors ( id BIGINT PRIMARY KEY, doc_embedding EMBEDDING(384) ) WITH (TYPE = 'USER'); ALTER TABLE rag.documents_vectors CREATE INDEX doc_embedding USING COSINE;

2) Insert or update embeddings

await client.query( `INSERT INTO rag.documents_vectors (id, doc_embedding) VALUES ($1, $2)`, [docId, JSON.stringify(embedding)], );

3) Query nearest documents

const matches = await client.queryAll( `SELECT d.id, d.title, d.body FROM rag.documents AS d JOIN rag.documents_vectors AS v ON v.id = d.id ORDER BY COSINE_DISTANCE(v.doc_embedding, $1) LIMIT 5`, [JSON.stringify(queryEmbedding)], );

Why this pattern helps:

  • keeps semantic retrieval inside the same SQL surface as the rest of your app
  • works with TYPE = 'USER' for tenant-safe agent memory
  • lets you combine vectors, files, and structured rows without a separate vector service
Last updated on