Chat Quickstart (Docs-Only)
This page gives a minimal, end-to-end TypeScript chat pipeline using only documented kalam-link APIs:
createClient,Auth.basic,login,connectquery,subscribeWithSqlconsumer(...).run(...),executeAsUser
This path is for local development without OIDC setup.
Prerequisites
- Node.js
>= 18 - running KalamDB server
- a user that can run SQL and consume topic messages
Install:
npm i kalam-link1) Create a connected client
import { createClient, Auth } from 'kalam-link';
const client = createClient({
url: 'http://localhost:8080',
auth: Auth.basic('admin', 'AdminPass123!'),
});
await client.login();
await client.connect();2) Apply chat schema + topic
await client.query('CREATE NAMESPACE IF NOT EXISTS chat');
await client.query(`
CREATE TABLE chat.conversations (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
title TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000')
`);
await client.query(`
CREATE TABLE chat.messages (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
conversation_id BIGINT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000')
`);
await client.query('CREATE TOPIC chat.ai_processing');
await client.query(`
ALTER TOPIC chat.ai_processing
ADD SOURCE chat.messages
ON INSERT
WITH (payload = 'full')
`);3) Start the worker consumer
TYPE = 'USER' tables are user-scoped. When a background worker writes assistant replies, write on behalf of the originating user.
const worker = client.consumer({
topic: 'chat.ai_processing',
group_id: 'ai-processor',
auto_ack: true,
batch_size: 1,
});
await worker.run(async (ctx) => {
const payload = ctx.message.value as Record<string, unknown>;
const row = ((payload.row as Record<string, unknown> | undefined) ?? payload) as {
conversation_id?: number | string;
role?: string;
content?: string;
};
if (row.role !== 'user' || typeof row.content !== 'string') {
return;
}
const username = String(ctx.username ?? '');
if (!username) return;
const conversationId = Number(row.conversation_id);
if (!Number.isInteger(conversationId)) {
return;
}
const reply = await generateReply(row.content);
const escapedReply = reply.replace(/'/g, "''");
await client.executeAsUser(
`INSERT INTO chat.messages (conversation_id, role, content) VALUES (${conversationId}, 'assistant', '${escapedReply}')`,
username,
);
});4) Subscribe in the chat client
Use subscribeWithSql and filter by a validated numeric conversation ID.
const conversationId = 42;
const unsubscribe = await client.subscribeWithSql(
`SELECT * FROM chat.messages
WHERE conversation_id = ${conversationId}
ORDER BY created_at ASC`,
(event) => {
if (event.type === 'initial_data_batch' && event.rows) {
renderRows(event.rows);
return;
}
if (event.type === 'change' && event.change_type === 'insert' && event.rows) {
renderRows(event.rows);
}
},
{ batch_size: 200 }
);Insert a user message:
await client.query(
'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',
[conversationId, 'user', 'hello'],
);5) Cleanup
await unsubscribe();
await client.disconnect();Next
Last updated on