Realtime Subscriptions
@kalamdb/client supports realtime subscriptions via WebSocket.
For application UI, start with live(). It returns the latest materialized row set, which is usually what your component state actually needs.
Live SQL must stay within the strict supported shape: SELECT ... FROM ... WHERE ....
Do not include ORDER BY or LIMIT inside live() or subscribeWithSql() SQL.
Recommended: materialized live rows
const inboxSql = `
SELECT id, room, role, body, created_at
FROM support.inbox
WHERE room = 'main'
`;
const unsub = await client.live(
inboxSql,
(rows) => {
// USER tables are tenant-private.
// The same SQL can run for every signed-in account, but the callback only
// receives rows from that caller's own table partition.
renderInbox(rows);
},
{
subscriptionOptions: { last_rows: 100 },
onError: (event) => {
console.error(event.code, event.message);
},
},
);Use liveTableRows() for the same behavior with table-name sugar:
const unsub = await client.liveTableRows('support.inbox', (rows) => {
renderInbox(rows);
});This path is implemented in the shared Rust core, so TypeScript and Dart use the same row materialization behavior by default.
Resume from a specific SeqId
Persist the latest applied SeqId and feed it back into subscriptionOptions.from on the next session.
import { SeqId } from '@kalamdb/client';
const inboxSql = `
SELECT id, room, role, body, created_at
FROM support.inbox
WHERE room = 'main'
`;
const savedSeqText = localStorage.getItem('support.inbox.seq');
const startFrom = savedSeqText ? SeqId.from(savedSeqText) : undefined;
const unsub = await client.live(
inboxSql,
(rows) => {
renderInbox(rows);
// Save the latest checkpoint after each applied snapshot.
const active = client.getSubscriptions().find((sub) => sub.tableName === inboxSql);
if (active?.lastSeqId) {
localStorage.setItem('support.inbox.seq', active.lastSeqId.toString());
}
},
{
subscriptionOptions: {
last_rows: 100,
...(startFrom ? { from: startFrom } : {}),
},
},
);This is the recommended pattern for chat timelines, activity feeds, audit streams, and reconnect-heavy mobile sessions.
Table subscription sugar
If you only need SELECT * FROM table, use the shorthand APIs:
const lowLevel = await client.subscribe('app.messages', (event) => {
console.log(event.type);
});
const highLevel = await client.liveTableRows('support.inbox', (rows) => {
renderInbox(rows);
});When to use which API
- Use
live()orliveTableRows()when you want the latest reconciled row array. - Use
subscribeWithSql()when you need low-levelsubscription_ack,initial_data_batch,change, orerrorframes.
Low-level SQL subscription
import { ChangeType, MessageType } from '@kalamdb/client';
const unsub = await client.subscribeWithSql(
'SELECT * FROM app.messages WHERE conversation_id = 42',
(event) => {
if (event.type !== MessageType.Change) {
return;
}
if (event.change_type === ChangeType.Insert) {
console.log(event.rows);
}
},
{ batch_size: 100, last_rows: 50 },
);Concurrency notes
The TypeScript SDK test suite exercises shared-socket multiplexing, multi-client fan-out, and many simultaneous subscriptions on one client. In practice, this is the expected production shape: several listeners can share one client connection while concurrent writers continue to publish into the same table.
Custom row identity in TypeScript
By default, the high-level API reconciles rows using the row id field in the Rust core.
If your query does not expose a stable id, prefer declarative keyColumns so reconciliation still stays inside the shared Rust core:
const unsub = await client.live(
"SELECT room, message_id, body, created_at FROM chat.messages WHERE room = 'main'",
(rows) => {
console.log(rows);
},
{
keyColumns: ['room', 'message_id'],
},
);Use getKey only when the identity must be derived by arbitrary JavaScript code:
const unsub = await client.live(
"SELECT room, created_at, body FROM chat.messages WHERE room = 'main'",
(rows) => {
console.log(rows);
},
{
getKey: (row) => `${row.room.asString()}:${row.created_at.asString()}`,
},
);When getKey is provided, reconciliation falls back to the TypeScript SDK layer because arbitrary JavaScript callbacks cannot be shared through the Rust core.
Typed row subscriptions (subscribeRows)
subscribeRows<T>() wraps incoming rows as KalamRow<T> so you can use:
row.datafor raw valuesrow.cell('col')forKalamCellValuerow.file('file_col')for bound file URLs
type User = { id: string; name: string; avatar: unknown };
const unsub = await client.subscribeRows<User>('app.users', (change) => {
if (change.type === MessageType.InitialDataBatch) {
console.log('initial rows:', change.rows.length);
return;
}
if (change.type === MessageType.Change && change.raw.change_type === ChangeType.Insert) {
for (const row of change.rows) {
console.log('insert user:', row.data.name);
const avatar = row.file('avatar');
if (avatar) console.log(avatar.downloadUrl());
}
}
if (change.type === MessageType.Change && change.raw.change_type === ChangeType.Update) {
const before = change.oldValues[0]?.data;
const after = change.rows[0]?.data;
console.log('update:', before, '→', after);
}
});Event model
Use runtime enums from types.ts:
import { MessageType, ChangeType } from '@kalamdb/client';Event sequence commonly observed:
subscription_ack- one or more
initial_data_batch changeevents (insert|update|delete)- optional
error
Subscription options
{
subscriptionOptions?: {
batch_size?: number;
last_rows?: number;
from?: SeqId | number | string;
};
}Use subscriptionOptions.from for resume patterns after reconnect.
SQL safety for dynamic filters
subscribeWithSql() accepts a SQL string. For dynamic values:
- Prefer validated numeric IDs before interpolation.
- Do not interpolate raw user text directly into SQL.
function buildConversationSql(conversationId: number): string {
if (!Number.isInteger(conversationId) || conversationId <= 0) {
throw new Error('conversationId must be a positive integer');
}
return `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId}`;
}Unsubscribe patterns
const unsub = await client.subscribe('app.messages', cb);
await unsub();
const subs = client.getSubscriptions();
if (subs[0]) {
await client.unsubscribe(subs[0].id);
}
await client.unsubscribeAll();UPDATE notification rows
For change_type === "update" events, the server sends:
rows— a full snapshot of all non-null columns in the updated row, plus the primary key column(s) and_seq. This means you always receive every non-null value, not just the columns that changed. This is particularly useful when using a table as a change trigger (e.g. an updates/events table where every column matters on each write).old_values— only the columns that actually changed, plus_seqand PK, with their previous values.
What each row contains
| Field | Column | Always present | Description |
|---|---|---|---|
rows | _seq | ✅ | Monotonically increasing sequence number — identifies the row version |
rows | <pk> (e.g. id) | ✅ | User-defined primary key — identifies which row was updated |
rows | non-null columns | ✅ | All columns from the updated row that have a non-null value |
old_values | _seq | ✅ | Previous sequence number |
old_values | <pk> | ✅ | Primary key (same as in rows) |
old_values | changed columns | only if changed | Previous values of columns that actually changed |
_deleted and other system columns are not included in UPDATE notifications. DELETE events
are sent as a separate notification with change_type === "delete".
Detecting which columns changed
Compare rows against old_values. The keys present in old_values (excluding system keys
starting with _ and PK columns) are exactly the columns that changed:
const changedCols = Object.keys(event.old_values[0])
.filter(k => !k.startsWith('_') && k !== 'id'); // replace 'id' with your PKAccessing previous values
old_values contains the previous values for only the changed columns (plus _seq and PK):
client.subscribeWithSql("SELECT * FROM app.orders WHERE customer_id = 42", (event) => {
if (event.type !== MessageType.Change) return;
if (event.change_type !== ChangeType.Update) return;
for (let i = 0; i < event.rows.length; i++) {
const newRow = event.rows[i];
const oldRow = event.old_values?.[i];
// Identify the row
const rowId = newRow.id; // user-defined PK
const seq = newRow._seq; // new sequence number
// Which columns actually changed? (inspect old_values keys)
const changedCols = Object.keys(oldRow ?? {}).filter(k => !k.startsWith('_'));
console.log(`Row ${rowId} updated. Changed: ${changedCols.join(', ')}`);
// All non-null values are available in newRow
console.log(`Current state: ${JSON.stringify(newRow)}`);
if (changedCols.includes('status')) {
console.log(`Status: ${oldRow?.status} → ${newRow.status}`);
}
}
});For subscribeRows(), the same data is available as change.rows and change.oldValues, while the raw discriminator is change.raw.change_type.
Using tables as change triggers
Because UPDATE notifications include all non-null values, you can use a table as a change-trigger without needing to re-query for the full row state:
// An "updates" table used as a trigger — every column is sent on each update
// CREATE USER TABLE updates (
// id TEXT PRIMARY KEY,
// update_type TEXT NOT NULL,
// target_id TEXT NOT NULL,
// updated TIMESTAMP NOT NULL,
// created TIMESTAMP NOT NULL
// )
client.subscribe('app.updates', (event) => {
if (event.change_type === ChangeType.Update) {
// All non-null columns are present — no need to re-query
const row = event.rows[0];
console.log(`Trigger: ${row.update_type} on ${row.target_id} at ${row.updated}`);
}
});Full row state after an update
Since rows already contains all non-null columns, you can directly replace your local
copy without merging:
const rowMap = new Map(initialRows.map(r => [r.id, r]));
client.subscribe('app.orders', (event) => {
if (event.change_type === ChangeType.Update) {
for (const row of event.rows) {
// rows already has all non-null columns — safe to replace directly
rowMap.set(row.id, { ...rowMap.get(row.id), ...row });
}
}
});Subscription introspection
client.getSubscriptionCount();
client.getSubscriptions();
client.isSubscribedTo('app.messages');
client.getLastSeqId('<subscription-id>');getLastSeqId() is useful for checkpointing consumption progress.
Error handling guidance
- Parse
event.type === MessageType.Errorand inspectcode/message. - Keep callbacks resilient (non-throwing) to avoid app-level disruption.
- For mission-critical streams, persist last sequence IDs and re-subscribe with
from.