SDK Cookbook
This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.
Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)
This mirrors the chat-with-ai architecture:
- User message inserted into
chat.messages - CDC routes inserts into topic
chat.ai_processing - Background service consumes topic and writes AI reply back to table
- Frontend receives both user and AI rows instantly via live subscription
1) SQL setup
CREATE NAMESPACE IF NOT EXISTS chat;
CREATE TABLE chat.messages (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
conversation_id BIGINT NOT NULL,
sender TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
) WITH (TYPE = 'USER');
CREATE TOPIC chat.ai_processing;
ALTER TOPIC chat.ai_processing
ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');2) Frontend subscription (React)
const chatSql = `
SELECT * FROM chat.messages
WHERE conversation_id = ${conversationId}
`;
const unsub = await client.live(
chatSql,
(rows) => {
// Let the SDK hand the UI the current materialized conversation state.
setMessages(rows);
},
{
subscriptionOptions: { last_rows: 200 },
},
);3) Node.js background processor (consumer)
import { Auth } from '@kalamdb/client';
import { createConsumerClient } from '@kalamdb/consumer';
const worker = createConsumerClient({
url: process.env.KALAMDB_URL ?? 'http://localhost:8080',
authProvider: async () => Auth.basic(process.env.KALAMDB_USERNAME!, process.env.KALAMDB_PASSWORD!),
});
const handle = worker.consumer({
topic: 'chat.ai_processing',
group_id: 'ai-processor-service',
auto_ack: true,
batch_size: 1,
});
await handle.run(async (ctx) => {
const payload = ctx.message.value as Record<string, unknown>;
const row = (payload.row ?? payload) as {
conversation_id: string | number;
role: string;
content: string;
};
if (row.role !== 'user') return;
const aiReply = await generateReply(row.content);
const username = String(ctx.username ?? '');
if (!username) return;
const conversationId = Number(row.conversation_id);
if (!Number.isInteger(conversationId)) return;
await worker.executeAsUser(
'INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES ($1, $2, $3, $4)',
username,
[conversationId, 'AI Assistant', 'assistant', aiReply],
);
});4) Operational notes from source
chat-with-aiusesexecuteAsUser(...)to preserve per-user isolation in service writes.- Typing indicator updates are done via a stream table (
chat.typing_indicators). - Service loads WASM explicitly in Node contexts where needed.
Recipe 2: Resumable subscriptions with sequence checkpoints
Use from plus the typed lastSeqId checkpoints returned by getSubscriptions().
import { SeqId } from '@kalamdb/client';
let savedSeqId: SeqId | undefined;
const eventsSql = "SELECT * FROM app.events WHERE source = 'system'";
const unsub = await client.live(
eventsSql,
(rows) => {
// process the fully reconciled event feed
renderEvents(rows);
const active = client.getSubscriptions().find((sub) => sub.tableName === eventsSql);
if (active?.lastSeqId) {
savedSeqId = active.lastSeqId;
}
},
{
subscriptionOptions: {
...(savedSeqId ? { from: savedSeqId } : {}),
},
},
);Recipe 2.1: Payload parser guard for consumer handlers
When your topic source uses WITH (payload = 'full'), parse both wrapper and direct row forms:
function extractMessageRow(value: unknown) {
if (!value || typeof value !== 'object') return null;
const record = value as Record<string, unknown>;
const source =
record.row && typeof record.row === 'object'
? (record.row as Record<string, unknown>)
: record;
const conversationId = Number(source.conversation_id);
if (!Number.isInteger(conversationId)) return null;
if (source.role !== 'user' && source.role !== 'assistant') return null;
if (typeof source.content !== 'string') return null;
return {
conversation_id: conversationId,
role: source.role,
content: source.content,
};
}Recipe 3: Typed query helpers for safer app code
interface Conversation {
id: string;
title: string;
created_at: string;
}
const conversations = await client.queryAll(
'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC'
);
const firstConversation = await client.queryOne(
'SELECT id, title, created_at FROM chat.conversations LIMIT 1'
);
conversations.forEach((row) => {
console.log(row.id.asString(), row.title.asString(), row.created_at.asString());
});
console.log(firstConversation?.id.asString());Why this pattern helps:
- keeps SQL in one place
- maps array-row response into typed objects
- reduces unsafe indexing in UI code
Recipe 4: File attachments in messages
const result = await client.queryWithFiles(
'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))',
{ attachment: selectedFile },
[conversationId, 'user', 'user', messageText],
(progress) => {
setUploadPercent(progress.percent);
}
);Then parse FILE metadata:
import { parseFileRef } from '@kalamdb/client';
const ref = parseFileRef(row.file_data);
if (ref) {
const downloadUrl = ref.getDownloadUrl('http://localhost:8080', 'chat', 'messages');
console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription());
}Recipe 5: Connection bootstrap for apps
const client = createClient({
url: 'http://localhost:8080',
authProvider: async () => Auth.basic(username, password),
wasmUrl: '/wasm/kalam_client_bg.wasm',
});
await client.login();
client.setAutoReconnect(true);
client.setReconnectDelay(1000, 30000);
client.setMaxReconnectAttempts(0);This mirrors the provider pattern in chat-with-ai: Basic credentials are exchanged for JWT automatically before the realtime connection. There is no need to call connect() manually.
Recipe 6: Graceful shutdown for worker services
const handle = worker.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true });
const runPromise = handle.run(async (ctx) => {
await processMessage(ctx.message.value);
});
process.on('SIGINT', async () => {
handle.stop();
await runPromise.catch(() => undefined);
await worker.disconnect();
process.exit(0);
});Recipe 7: Vector search for semantic retrieval
This matches the new vector-search flow tested in KalamDB core: embeddings live in regular SQL tables, vector indexes are created with DDL, and ranking happens in standard SQL.
1) Schema
CREATE TABLE rag.documents (
id BIGINT PRIMARY KEY,
title TEXT NOT NULL,
body TEXT NOT NULL
) WITH (TYPE = 'USER');
CREATE TABLE rag.documents_vectors (
id BIGINT PRIMARY KEY,
doc_embedding EMBEDDING(384)
) WITH (TYPE = 'USER');
ALTER TABLE rag.documents_vectors
CREATE INDEX doc_embedding USING COSINE;2) Insert or update embeddings
await client.query(
`INSERT INTO rag.documents_vectors (id, doc_embedding)
VALUES ($1, $2)`,
[docId, JSON.stringify(embedding)],
);3) Query nearest documents
const matches = await client.queryAll(
`SELECT d.id, d.title, d.body
FROM rag.documents AS d
JOIN rag.documents_vectors AS v ON v.id = d.id
ORDER BY COSINE_DISTANCE(v.doc_embedding, $1)
LIMIT 5`,
[JSON.stringify(queryEmbedding)],
);Why this pattern helps:
- keeps semantic retrieval inside the same SQL surface as the rest of your app
- works with
TYPE = 'USER'for tenant-safe agent memory - lets you combine vectors, files, and structured rows without a separate vector service