Realtime Subscriptions
@kalamdb/client supports realtime subscriptions via WebSocket.
For application UI, start with live(). It returns the latest materialized row set, which is usually what your component state actually needs.
Live SQL must stay within the strict supported shape: SELECT ... FROM ... WHERE ....
Do not include ORDER BY or LIMIT inside live() or subscribeWithSql() SQL.
Use subscriptionOptions.last_rows for the initial rewind window and limit
to keep the client-side materialized row set bounded over time.
The controls are intentionally separate:
subscriptionOptions.batch_sizechunks the initial snapshot coming from the server.subscriptionOptions.last_rowsselects how much history to rewind before live changes begin.limitbounds the materialized live row set that the client keeps after startup.
Recommended: materialized live rows
Use liveTableRows() for the same behavior with table-name sugar:
This path is implemented in the shared Rust core, so TypeScript and Dart use the same row materialization behavior by default.
Resume from a specific SeqId
Persist the latest applied SeqId and feed it back into subscriptionOptions.from on the next session.
This is the recommended pattern for chat timelines, activity feeds, audit streams, and reconnect-heavy mobile sessions.
Table subscription sugar
If you only need SELECT * FROM table, use the shorthand APIs:
When to use which API
- Use
live()orliveTableRows()when you want the latest reconciled row array. - Use
subscribeWithSql()when you need low-levelsubscription_ack,initial_data_batch,change, orerrorframes.
Low-level SQL subscription
When you want explicit control over paged initial data, use subscribeWithSqlHandle().
It returns the subscription id plus a requestNextBatch() method. Set
auto_fetch_batches: false to keep startup on the first batch until your UI
asks for the next one.
Set auto_fetch_batches: true when you want the browser client to request all
remaining initial batches automatically. The Node/native client path keeps its
existing automatic batch behavior unless you set auto_fetch_batches: false.
Concurrency notes
The TypeScript SDK test suite exercises shared-socket multiplexing, multi-client fan-out, and many simultaneous subscriptions on one client. In practice, this is the expected production shape: several listeners can share one client connection while concurrent writers continue to publish into the same table.
Custom row identity in TypeScript
By default, the high-level API reconciles rows using the row id field in the Rust core.
If your query does not expose a stable id, prefer declarative keyColumns so reconciliation still stays inside the shared Rust core:
Use getKey only when the identity must be derived by arbitrary JavaScript code:
When getKey is provided, reconciliation falls back to the TypeScript SDK layer because arbitrary JavaScript callbacks cannot be shared through the Rust core.
Typed row subscriptions (subscribeRows)
subscribeRows<T>() wraps incoming rows as KalamRow<T> so you can use:
row.datafor raw valuesrow.cell('col')forKalamCellValuerow.file('file_col')for bound file URLs
Event model
Use runtime enums from types.ts:
Event sequence commonly observed:
subscription_ack- one or more
initial_data_batch changeevents (insert|update|delete)- optional
error
Subscription options
Use subscriptionOptions.from for resume patterns after reconnect. The SDK only
tracks this single SeqId; snapshot and commit boundaries are owned by the
server and are not part of the client API.
SQL safety for dynamic filters
subscribeWithSql() accepts a SQL string. For dynamic values:
- Prefer validated numeric IDs before interpolation.
- Do not interpolate raw user text directly into SQL.
Unsubscribe patterns
UPDATE notification rows
For change_type === "update" events, the server sends:
rows— a full snapshot of all non-null columns in the updated row, plus the primary key column(s) and_seq. This means you always receive every non-null value, not just the columns that changed. This is particularly useful when using a table as a change trigger (e.g. an updates/events table where every column matters on each write).old_values— only the columns that actually changed, plus_seqand PK, with their previous values.
What each row contains
| Field | Column | Always present | Description |
|---|---|---|---|
rows | _seq | ✅ | Monotonically increasing sequence number — identifies the row version |
rows | <pk> (e.g. id) | ✅ | User-defined primary key — identifies which row was updated |
rows | non-null columns | ✅ | All columns from the updated row that have a non-null value |
old_values | _seq | ✅ | Previous sequence number |
old_values | <pk> | ✅ | Primary key (same as in rows) |
old_values | changed columns | only if changed | Previous values of columns that actually changed |
_deleted and other system columns are not included in UPDATE notifications. DELETE events
are sent as a separate notification with change_type === "delete".
Detecting which columns changed
Compare rows against old_values. The keys present in old_values (excluding system keys
starting with _ and PK columns) are exactly the columns that changed:
Accessing previous values
old_values contains the previous values for only the changed columns (plus _seq and PK):
For subscribeRows(), the same data is available as change.rows and change.oldValues, while the raw discriminator is change.raw.change_type.
Using tables as change triggers
Because UPDATE notifications include all non-null values, you can use a table as a change-trigger without needing to re-query for the full row state:
Full row state after an update
Since rows already contains all non-null columns, you can directly replace your local
copy without merging:
Subscription introspection
getLastSeqId() is useful for checkpointing consumption progress.
Error handling guidance
- Parse
event.type === MessageType.Errorand inspectcode/message. - Keep callbacks resilient (non-throwing) to avoid app-level disruption.
- For mission-critical streams, persist last sequence IDs and re-subscribe with
from.