Skip to Content
Cookbook

SDK Cookbook

This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.

Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)

This mirrors the chat-with-ai architecture:

  1. User message inserted into chat.messages
  2. CDC routes inserts into topic chat.ai_processing
  3. Background service consumes topic and writes AI reply back to table
  4. Frontend receives both user and AI rows instantly via live subscription

1) SQL setup

sql snippetSQL
CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.messages (  id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),  conversation_id BIGINT NOT NULL,  sender TEXT NOT NULL,  role TEXT NOT NULL DEFAULT 'user',  content TEXT NOT NULL,  created_at TIMESTAMP NOT NULL DEFAULT NOW()) WITH (TYPE = 'USER'); CREATE TOPIC chat.ai_processing;ALTER TOPIC chat.ai_processing  ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');

2) Frontend subscription (React)

ts snippetTS
const chatSql = `  SELECT * FROM chat.messages  WHERE conversation_id = ${conversationId}`; const unsub = await client.live(  chatSql,  (rows) => {    // Let the SDK hand the UI the current materialized conversation state.    setMessages(rows);  },  {    subscriptionOptions: { last_rows: 200 },  },);

3) Node.js background processor (consumer)

ts snippetTS
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const worker = createConsumerClient({  url: process.env.KALAMDB_URL ?? 'http://localhost:2900',  authProvider: async () => Auth.basic(process.env.KALAMDB_USER!, process.env.KALAMDB_PASSWORD!),}); const handle = worker.consumer({  topic: 'chat.ai_processing',  group_id: 'ai-processor-service',  auto_ack: true,  batch_size: 1,}); await handle.run(async (ctx) => {  const payload = ctx.message.value as Record<string, unknown>;  const row = (payload.row ?? payload) as {    conversation_id: string | number;    role: string;    content: string;  };   if (row.role !== 'user') return;   const aiReply = await generateReply(row.content);  const user = String(ctx.user ?? '');  if (!user) return;  const conversationId = Number(row.conversation_id);  if (!Number.isInteger(conversationId)) return;   await worker.executeAsUser(    'INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES ($1, $2, $3, $4)',    user,    [conversationId, 'AI Assistant', 'assistant', aiReply],  );});

4) Operational notes from source

  • executeAsUser(...) follows KalamDB’s role matrix; service workers can target service/user accounts, while direct USER-table queries remain scoped to the authenticated account.
  • executeAsUser(...) is valid for USER and STREAM tables; shared tables still rely on their table policy.
  • Typing indicator updates are done via a stream table (chat.typing_indicators).
  • Service loads WASM explicitly in Node contexts where needed.

Recipe 2: Resumable subscriptions with sequence checkpoints

Use from plus the typed lastSeqId checkpoints returned by getSubscriptions().

ts snippetTS
import { SeqId } from '@kalamdb/client'; let savedSeqId: SeqId | undefined;const eventsSql = "SELECT * FROM app.events WHERE source = 'system'"; const unsub = await client.live(  eventsSql,  (rows) => {    // process the fully reconciled event feed    renderEvents(rows);     const active = client.getSubscriptions().find((sub) => sub.tableName === eventsSql);    if (active?.lastSeqId) {      savedSeqId = active.lastSeqId;    }  },  {    subscriptionOptions: {      ...(savedSeqId ? { from: savedSeqId } : {}),    },  },);

Recipe 2.1: Payload parser guard for consumer handlers

When your topic source uses WITH (payload = 'full'), parse both wrapper and direct row forms:

ts snippetTS
function extractMessageRow(value: unknown) {  if (!value || typeof value !== 'object') return null;  const record = value as Record<string, unknown>;  const source =    record.row && typeof record.row === 'object'      ? (record.row as Record<string, unknown>)      : record;   const conversationId = Number(source.conversation_id);  if (!Number.isInteger(conversationId)) return null;  if (source.role !== 'user' && source.role !== 'assistant') return null;  if (typeof source.content !== 'string') return null;   return {    conversation_id: conversationId,    role: source.role,    content: source.content,  };}

Recipe 3: Typed query helpers for safer app code

ts snippetTS
interface Conversation {  id: string;  title: string;  created_at: string;} const conversations = await client.queryAll(  'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC'); const firstConversation = await client.queryOne(  'SELECT id, title, created_at FROM chat.conversations LIMIT 1'); conversations.forEach((row) => {  console.log(row.id.asString(), row.title.asString(), row.created_at.asString());}); console.log(firstConversation?.id.asString());

Why this pattern helps:

  • keeps SQL in one place
  • maps array-row response into typed objects
  • reduces unsafe indexing in UI code

Recipe 4: File attachments in messages

ts snippetTS
const result = await client.queryWithFiles(  'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))',  { attachment: selectedFile },  [conversationId, 'user', 'user', messageText],  (progress) => {    setUploadPercent(progress.percent);  });

Then parse FILE metadata:

ts snippetTS
import { parseFileRef } from '@kalamdb/client'; const ref = parseFileRef(row.file_data);if (ref) {  const downloadUrl = ref.getDownloadUrl('http://localhost:2900', 'chat', 'messages');  console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription());}

Recipe 5: Connection bootstrap for apps

ts snippetTS
const client = createClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.basic(username, password),  wasmUrl: '/wasm/kalam_client_bg.wasm',}); await client.login();client.setAutoReconnect(true);client.setReconnectDelay(1000, 30000);client.setMaxReconnectAttempts(0);

This mirrors the provider pattern in chat-with-ai: Basic credentials are exchanged for JWT automatically before the realtime connection. Most apps can let the first live/subscription call connect lazily; call connect() only when you want to establish the shared WebSocket eagerly.

Recipe 6: Graceful shutdown for worker services

ts snippetTS
const handle = worker.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true }); const runPromise = handle.run(async (ctx) => {  await processMessage(ctx.message.value);}); process.on('SIGINT', async () => {  handle.stop();  await runPromise.catch(() => undefined);  await worker.disconnect();  process.exit(0);});

Recipe 7: Vector search for semantic retrieval

This matches the new vector-search flow tested in KalamDB core: embeddings live in regular SQL tables, vector indexes are created with DDL, and ranking happens in standard SQL.

1) Schema

sql snippetSQL
CREATE TABLE rag.documents (  id BIGINT PRIMARY KEY,  title TEXT NOT NULL,  body TEXT NOT NULL) WITH (TYPE = 'USER'); CREATE TABLE rag.documents_vectors (  id BIGINT PRIMARY KEY,  doc_embedding EMBEDDING(384)) WITH (TYPE = 'USER'); ALTER TABLE rag.documents_vectors  CREATE INDEX doc_embedding USING COSINE;

2) Insert or update embeddings

ts snippetTS
await client.query(  `INSERT INTO rag.documents_vectors (id, doc_embedding)   VALUES ($1, $2)`,  [docId, JSON.stringify(embedding)],);

3) Query nearest documents

ts snippetTS
const matches = await client.queryAll(  `SELECT d.id, d.title, d.body   FROM rag.documents AS d   JOIN rag.documents_vectors AS v ON v.id = d.id   ORDER BY COSINE_DISTANCE(v.doc_embedding, $1)   LIMIT 5`,  [JSON.stringify(queryEmbedding)],);

Why this pattern helps:

  • keeps semantic retrieval inside the same SQL surface as the rest of your app
  • works with TYPE = 'USER' for tenant-safe agent memory
  • lets you combine vectors, files, and structured rows without a separate vector service
Last updated on