Skip to Content
SDK & ClientTypeScript SDKChat Quickstart (Docs-Only)

Chat Quickstart (Docs-Only)

This page gives a minimal, end-to-end TypeScript chat pipeline using only documented kalam-link APIs:

  • createClient, Auth.basic, login, connect
  • query, subscribeWithSql
  • consumer(...).run(...), executeAsUser

This path is for local development without OIDC setup.

Prerequisites

  • Node.js >= 18
  • running KalamDB server
  • a user that can run SQL and consume topic messages

Install:

npm i kalam-link

1) Create a connected client

import { createClient, Auth } from 'kalam-link'; const client = createClient({ url: 'http://localhost:8080', auth: Auth.basic('admin', 'AdminPass123!'), }); await client.login(); await client.connect();

2) Apply chat schema + topic

await client.query('CREATE NAMESPACE IF NOT EXISTS chat'); await client.query(` CREATE TABLE chat.conversations ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), title TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000') `); await client.query(` CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000') `); await client.query('CREATE TOPIC chat.ai_processing'); await client.query(` ALTER TOPIC chat.ai_processing ADD SOURCE chat.messages ON INSERT WITH (payload = 'full') `);

3) Start the worker consumer

TYPE = 'USER' tables are user-scoped. When a background worker writes assistant replies, write on behalf of the originating user.

const worker = client.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor', auto_ack: true, batch_size: 1, }); await worker.run(async (ctx) => { const payload = ctx.message.value as Record<string, unknown>; const row = ((payload.row as Record<string, unknown> | undefined) ?? payload) as { conversation_id?: number | string; role?: string; content?: string; }; if (row.role !== 'user' || typeof row.content !== 'string') { return; } const username = String(ctx.username ?? ''); if (!username) return; const conversationId = Number(row.conversation_id); if (!Number.isInteger(conversationId)) { return; } const reply = await generateReply(row.content); const escapedReply = reply.replace(/'/g, "''"); await client.executeAsUser( `INSERT INTO chat.messages (conversation_id, role, content) VALUES (${conversationId}, 'assistant', '${escapedReply}')`, username, ); });

4) Subscribe in the chat client

Use subscribeWithSql and filter by a validated numeric conversation ID.

const conversationId = 42; const unsubscribe = await client.subscribeWithSql( `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId} ORDER BY created_at ASC`, (event) => { if (event.type === 'initial_data_batch' && event.rows) { renderRows(event.rows); return; } if (event.type === 'change' && event.change_type === 'insert' && event.rows) { renderRows(event.rows); } }, { batch_size: 200 } );

Insert a user message:

await client.query( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', [conversationId, 'user', 'hello'], );

5) Cleanup

await unsubscribe(); await client.disconnect();

Next

Last updated on