Skip to Content
SDK & ClientTypeScript SDKRealtime Subscriptions

Realtime Subscriptions

@kalamdb/client supports realtime subscriptions via WebSocket.

For application UI, start with live(). It returns the latest materialized row set, which is usually what your component state actually needs.

Live SQL must stay within the strict supported shape: SELECT ... FROM ... WHERE .... Do not include ORDER BY or LIMIT inside live() or subscribeWithSql() SQL.

const inboxSql = ` SELECT id, room, role, body, created_at FROM support.inbox WHERE room = 'main' `; const unsub = await client.live( inboxSql, (rows) => { // USER tables are tenant-private. // The same SQL can run for every signed-in account, but the callback only // receives rows from that caller's own table partition. renderInbox(rows); }, { subscriptionOptions: { last_rows: 100 }, onError: (event) => { console.error(event.code, event.message); }, }, );

Use liveTableRows() for the same behavior with table-name sugar:

const unsub = await client.liveTableRows('support.inbox', (rows) => { renderInbox(rows); });

This path is implemented in the shared Rust core, so TypeScript and Dart use the same row materialization behavior by default.

Resume from a specific SeqId

Persist the latest applied SeqId and feed it back into subscriptionOptions.from on the next session.

import { SeqId } from '@kalamdb/client'; const inboxSql = ` SELECT id, room, role, body, created_at FROM support.inbox WHERE room = 'main' `; const savedSeqText = localStorage.getItem('support.inbox.seq'); const startFrom = savedSeqText ? SeqId.from(savedSeqText) : undefined; const unsub = await client.live( inboxSql, (rows) => { renderInbox(rows); // Save the latest checkpoint after each applied snapshot. const active = client.getSubscriptions().find((sub) => sub.tableName === inboxSql); if (active?.lastSeqId) { localStorage.setItem('support.inbox.seq', active.lastSeqId.toString()); } }, { subscriptionOptions: { last_rows: 100, ...(startFrom ? { from: startFrom } : {}), }, }, );

This is the recommended pattern for chat timelines, activity feeds, audit streams, and reconnect-heavy mobile sessions.

Table subscription sugar

If you only need SELECT * FROM table, use the shorthand APIs:

const lowLevel = await client.subscribe('app.messages', (event) => { console.log(event.type); }); const highLevel = await client.liveTableRows('support.inbox', (rows) => { renderInbox(rows); });

When to use which API

  • Use live() or liveTableRows() when you want the latest reconciled row array.
  • Use subscribeWithSql() when you need low-level subscription_ack, initial_data_batch, change, or error frames.

Low-level SQL subscription

import { ChangeType, MessageType } from '@kalamdb/client'; const unsub = await client.subscribeWithSql( 'SELECT * FROM app.messages WHERE conversation_id = 42', (event) => { if (event.type !== MessageType.Change) { return; } if (event.change_type === ChangeType.Insert) { console.log(event.rows); } }, { batch_size: 100, last_rows: 50 }, );

Concurrency notes

The TypeScript SDK test suite exercises shared-socket multiplexing, multi-client fan-out, and many simultaneous subscriptions on one client. In practice, this is the expected production shape: several listeners can share one client connection while concurrent writers continue to publish into the same table.

Custom row identity in TypeScript

By default, the high-level API reconciles rows using the row id field in the Rust core.

If your query does not expose a stable id, prefer declarative keyColumns so reconciliation still stays inside the shared Rust core:

const unsub = await client.live( "SELECT room, message_id, body, created_at FROM chat.messages WHERE room = 'main'", (rows) => { console.log(rows); }, { keyColumns: ['room', 'message_id'], }, );

Use getKey only when the identity must be derived by arbitrary JavaScript code:

const unsub = await client.live( "SELECT room, created_at, body FROM chat.messages WHERE room = 'main'", (rows) => { console.log(rows); }, { getKey: (row) => `${row.room.asString()}:${row.created_at.asString()}`, }, );

When getKey is provided, reconciliation falls back to the TypeScript SDK layer because arbitrary JavaScript callbacks cannot be shared through the Rust core.

Typed row subscriptions (subscribeRows)

subscribeRows<T>() wraps incoming rows as KalamRow<T> so you can use:

  • row.data for raw values
  • row.cell('col') for KalamCellValue
  • row.file('file_col') for bound file URLs
type User = { id: string; name: string; avatar: unknown }; const unsub = await client.subscribeRows<User>('app.users', (change) => { if (change.type === MessageType.InitialDataBatch) { console.log('initial rows:', change.rows.length); return; } if (change.type === MessageType.Change && change.raw.change_type === ChangeType.Insert) { for (const row of change.rows) { console.log('insert user:', row.data.name); const avatar = row.file('avatar'); if (avatar) console.log(avatar.downloadUrl()); } } if (change.type === MessageType.Change && change.raw.change_type === ChangeType.Update) { const before = change.oldValues[0]?.data; const after = change.rows[0]?.data; console.log('update:', before, '→', after); } });

Event model

Use runtime enums from types.ts:

import { MessageType, ChangeType } from '@kalamdb/client';

Event sequence commonly observed:

  1. subscription_ack
  2. one or more initial_data_batch
  3. change events (insert|update|delete)
  4. optional error

Subscription options

{ subscriptionOptions?: { batch_size?: number; last_rows?: number; from?: SeqId | number | string; }; }

Use subscriptionOptions.from for resume patterns after reconnect.

SQL safety for dynamic filters

subscribeWithSql() accepts a SQL string. For dynamic values:

  • Prefer validated numeric IDs before interpolation.
  • Do not interpolate raw user text directly into SQL.
function buildConversationSql(conversationId: number): string { if (!Number.isInteger(conversationId) || conversationId <= 0) { throw new Error('conversationId must be a positive integer'); } return `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId}`; }

Unsubscribe patterns

const unsub = await client.subscribe('app.messages', cb); await unsub(); const subs = client.getSubscriptions(); if (subs[0]) { await client.unsubscribe(subs[0].id); } await client.unsubscribeAll();

UPDATE notification rows

For change_type === "update" events, the server sends:

  • rows — a full snapshot of all non-null columns in the updated row, plus the primary key column(s) and _seq. This means you always receive every non-null value, not just the columns that changed. This is particularly useful when using a table as a change trigger (e.g. an updates/events table where every column matters on each write).
  • old_values — only the columns that actually changed, plus _seq and PK, with their previous values.

What each row contains

FieldColumnAlways presentDescription
rows_seqMonotonically increasing sequence number — identifies the row version
rows<pk> (e.g. id)User-defined primary key — identifies which row was updated
rowsnon-null columnsAll columns from the updated row that have a non-null value
old_values_seqPrevious sequence number
old_values<pk>Primary key (same as in rows)
old_valueschanged columnsonly if changedPrevious values of columns that actually changed

_deleted and other system columns are not included in UPDATE notifications. DELETE events are sent as a separate notification with change_type === "delete".

Detecting which columns changed

Compare rows against old_values. The keys present in old_values (excluding system keys starting with _ and PK columns) are exactly the columns that changed:

const changedCols = Object.keys(event.old_values[0]) .filter(k => !k.startsWith('_') && k !== 'id'); // replace 'id' with your PK

Accessing previous values

old_values contains the previous values for only the changed columns (plus _seq and PK):

client.subscribeWithSql("SELECT * FROM app.orders WHERE customer_id = 42", (event) => { if (event.type !== MessageType.Change) return; if (event.change_type !== ChangeType.Update) return; for (let i = 0; i < event.rows.length; i++) { const newRow = event.rows[i]; const oldRow = event.old_values?.[i]; // Identify the row const rowId = newRow.id; // user-defined PK const seq = newRow._seq; // new sequence number // Which columns actually changed? (inspect old_values keys) const changedCols = Object.keys(oldRow ?? {}).filter(k => !k.startsWith('_')); console.log(`Row ${rowId} updated. Changed: ${changedCols.join(', ')}`); // All non-null values are available in newRow console.log(`Current state: ${JSON.stringify(newRow)}`); if (changedCols.includes('status')) { console.log(`Status: ${oldRow?.status} → ${newRow.status}`); } } });

For subscribeRows(), the same data is available as change.rows and change.oldValues, while the raw discriminator is change.raw.change_type.

Using tables as change triggers

Because UPDATE notifications include all non-null values, you can use a table as a change-trigger without needing to re-query for the full row state:

// An "updates" table used as a trigger — every column is sent on each update // CREATE USER TABLE updates ( // id TEXT PRIMARY KEY, // update_type TEXT NOT NULL, // target_id TEXT NOT NULL, // updated TIMESTAMP NOT NULL, // created TIMESTAMP NOT NULL // ) client.subscribe('app.updates', (event) => { if (event.change_type === ChangeType.Update) { // All non-null columns are present — no need to re-query const row = event.rows[0]; console.log(`Trigger: ${row.update_type} on ${row.target_id} at ${row.updated}`); } });

Full row state after an update

Since rows already contains all non-null columns, you can directly replace your local copy without merging:

const rowMap = new Map(initialRows.map(r => [r.id, r])); client.subscribe('app.orders', (event) => { if (event.change_type === ChangeType.Update) { for (const row of event.rows) { // rows already has all non-null columns — safe to replace directly rowMap.set(row.id, { ...rowMap.get(row.id), ...row }); } } });

Subscription introspection

client.getSubscriptionCount(); client.getSubscriptions(); client.isSubscribedTo('app.messages'); client.getLastSeqId('<subscription-id>');

getLastSeqId() is useful for checkpointing consumption progress.

Error handling guidance

  • Parse event.type === MessageType.Error and inspect code/message.
  • Keep callbacks resilient (non-throwing) to avoid app-level disruption.
  • For mission-critical streams, persist last sequence IDs and re-subscribe with from.
Last updated on