Skip to Content

SDK Cookbook

This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.

Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)

This mirrors the chat-with-ai architecture:

  1. User message inserted into chat.messages
  2. CDC routes inserts into topic chat.ai_processing
  3. Background service consumes topic and writes AI reply back to table
  4. Frontend receives both user and AI rows instantly via live subscription

1) SQL setup

CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, sender TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', content TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) WITH (TYPE = 'USER'); CREATE TOPIC chat.ai_processing; ALTER TOPIC chat.ai_processing ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');

2) Frontend subscription (React)

const unsub = await client.subscribeWithSql( `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId} ORDER BY created_at ASC`, (event) => { if (event.type === 'initial_data_batch' && event.rows) { setMessages((prev) => mergeRows(prev, event.rows)); return; } if (event.type === 'change' && event.change_type === 'insert' && event.rows) { setMessages((prev) => mergeRows(prev, event.rows)); } }, { batch_size: 200 } );

3) Node.js background processor (consumer)

import { createClient, Auth } from 'kalam-link'; const client = createClient({ url: process.env.KALAMDB_URL ?? 'http://localhost:8080', auth: Auth.basic(process.env.KALAMDB_USERNAME!, process.env.KALAMDB_PASSWORD!), }); await client.connect(); const handle = client.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor-service', auto_ack: true, batch_size: 1, }); await handle.run(async (ctx) => { const payload = ctx.message.value as Record<string, unknown>; const row = (payload.row ?? payload) as { conversation_id: string | number; role: string; content: string; }; if (row.role !== 'user') return; const aiReply = await generateReply(row.content); const username = String(ctx.username ?? ''); if (!username) return; const conversationId = Number(row.conversation_id); if (!Number.isInteger(conversationId)) return; await client.executeAsUser( `INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES (${conversationId}, 'AI Assistant', 'assistant', '${aiReply.replace(/'/g, "''")}')`, username, ); });

4) Operational notes from source

  • chat-with-ai uses executeAsUser(...) to preserve per-user isolation in service writes.
  • Typing indicator updates are done via a stream table (chat.typing_indicators).
  • Service loads WASM explicitly in Node contexts where needed.

Recipe 2: Resumable subscriptions with sequence checkpoints

Use from_seq_id + getLastSeqId(subscriptionId).

let savedSeqId: number | undefined; const unsub = await client.subscribeWithSql( 'SELECT * FROM app.events ORDER BY created_at ASC', (event) => { if (event.type === 'change') { // process event } }, savedSeqId !== undefined ? { from_seq_id: savedSeqId } : undefined ); // Later, if you also keep subscription IDs in your app state: for (const sub of client.getSubscriptions()) { const seq = client.getLastSeqId(sub.id); if (seq) savedSeqId = Number(seq); }

Recipe 2.1: Payload parser guard for consumer handlers

When your topic source uses WITH (payload = 'full'), parse both wrapper and direct row forms:

function extractMessageRow(value: unknown) { if (!value || typeof value !== 'object') return null; const record = value as Record<string, unknown>; const source = record.row && typeof record.row === 'object' ? (record.row as Record<string, unknown>) : record; const conversationId = Number(source.conversation_id); if (!Number.isInteger(conversationId)) return null; if (source.role !== 'user' && source.role !== 'assistant') return null; if (typeof source.content !== 'string') return null; return { conversation_id: conversationId, role: source.role, content: source.content, }; }

Recipe 3: Typed query helpers for safer app code

interface Conversation { id: string; title: string; created_at: string; } const conversations = await client.queryAll<Conversation>( 'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC' ); const firstConversation = await client.queryOne<Conversation>( 'SELECT id, title, created_at FROM chat.conversations LIMIT 1' );

Why this pattern helps:

  • keeps SQL in one place
  • maps array-row response into typed objects
  • reduces unsafe indexing in UI code

Recipe 4: File attachments in messages

const result = await client.queryWithFiles( 'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))', { attachment: selectedFile }, [conversationId, 'user', 'user', messageText], (progress) => { setUploadPercent(progress.percent); } );

Then parse FILE metadata:

import { parseFileRef } from 'kalam-link'; const ref = parseFileRef(row.file_data); if (ref) { const downloadUrl = ref.getDownloadUrl('http://localhost:8080', 'chat', 'messages'); console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription()); }

Recipe 5: Connection bootstrap for apps

const client = createClient({ url: 'http://localhost:8080', auth: Auth.basic(username, password), wasmUrl: '/wasm/kalam_link_bg.wasm', }); await client.login(); await client.connect(); client.setAutoReconnect(true); client.setReconnectDelay(1000, 30000); client.setMaxReconnectAttempts(0);

This mirrors the provider pattern in chat-with-ai: Basic login first, then JWT-backed realtime connection.

Recipe 6: Graceful shutdown for worker services

const handle = client.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true }); const runPromise = handle.run(async (ctx) => { await processMessage(ctx.message.value); }); process.on('SIGINT', async () => { handle.stop(); await runPromise.catch(() => undefined); await client.disconnect(); process.exit(0); });
Last updated on