Skip to Content
Advanced Mode

Advanced Realtime Mode

Use advanced mode when you want protocol events instead of a pre-materialized row set.

If your UI only needs current rows, use live() or liveTable() first. They expose onCheckpoint too, so durable resume does not require raw event handling.

liveEvents(sql, onEvent, options?) is the low-level API.

Low-level SQL events

ts snippetTS
import { ChangeType, MessageType } from '@kalamdb/client'; const stop = await client.liveEvents(  'SELECT * FROM app.messages WHERE conversation_id = 42',  (event) => {    if (event.type !== MessageType.Change) {      return;    }     if (event.change_type === ChangeType.Insert) {      console.log(event.rows);    }  },  {    batchSize: 100,    lastRows: 50,  },); await stop();

Use this path for protocol tooling, debugging, custom reconciliation, or event processors that need direct access to subscription_ack, initial_data_batch, change, and error frames.

Options

ts snippetTS
{  batchSize?: number;  lastRows?: number;  from?: SeqId | number | string;  autoFetchBatches?: boolean;  onCheckpoint?: ({ lastSeqId, subscriptionId }) => void;  onError?: (event) => void;}
  • batchSize chunks the initial snapshot from the server.
  • lastRows includes the last N rows before live changes begin.
  • from resumes from a known SeqId checkpoint.
  • autoFetchBatches controls whether the client asks for remaining startup batches automatically.
  • onCheckpoint fires whenever the SDK observes a newer applied sequence ID.
  • onError receives protocol error frames without forcing your event callback to branch on every error.

Checkpoints

ts snippetTS
let latestSeq; const stop = await client.liveEvents(  'SELECT id, body FROM app.messages WHERE conversation_id = 42',  (event) => {    if (event.type === MessageType.Change) {      applyChange(event.change_type, event.rows, event.old_values);    }  },  {    lastRows: 20,    from: latestSeq,    onCheckpoint: ({ lastSeqId }) => {      latestSeq = lastSeqId;      localStorage.setItem('messages.seq', lastSeqId.toString());    },  },);

The checkpoint is a single SeqId. Snapshot and commit boundaries stay server-owned.

Event model

Use runtime enums from @kalamdb/client:

ts snippetTS
import { MessageType, ChangeType } from '@kalamdb/client';

Common sequence:

  1. subscription_ack
  2. one or more initial_data_batch
  3. change events with insert, update, or delete
  4. optional error

SQL safety for dynamic filters

liveEvents() accepts a SQL string. For dynamic values:

  • Prefer validated numeric IDs before interpolation.
  • Do not interpolate raw user text directly into SQL.
ts snippetTS
function buildConversationSql(conversationId: number): string {  if (!Number.isInteger(conversationId) || conversationId <= 0) {    throw new Error('conversationId must be a positive integer');  }   return `SELECT * FROM app.messages WHERE conversation_id = ${conversationId}`;}
Last updated on