Realtime Subscriptions
The Dart/Flutter SDK supports “live queries”: you subscribe to a SELECT statement and receive a stream of change events over WebSocket.
The important ownership boundary is:
- Rust owns the shared connection, reconnect loop, replay filtering, and resume checkpoint tracking.
- Dart owns only the stream wrapper and typed row decoding.
That keeps reconnect behavior consistent with other SDKs and avoids re-implementing replay logic in Flutter apps.
On native Dart targets, subscription traffic uses MessagePack by default on that shared Rust-managed connection, which trims wire size and decode overhead compared with JSON.
Live SQL must follow the strict supported shape: SELECT ... FROM ... WHERE ....
Do not include ORDER BY or LIMIT in subscribe() or materialized live-query SQL.
Use:
Stream<ChangeEvent> subscribe(
String sql, {
int? batchSize,
int? lastRows,
SeqId? from,
String? subscriptionId,
})For low-level event handling, subscribe(...) is the right API.
Unlike query(...), the current live-query APIs take the final SQL string
directly and do not expose a separate params argument.
The Dart SDK also exposes materialized live-row helpers when you want the SDK to keep the current row set reconciled for you:
liveQueryRowsWithSql<T>(sql, ...)liveTableRows<T>(tableName, ...)
Which API to use
Choose the API based on the shape your app actually needs:
- Use
subscribe(...)when you need low-level protocol events likeAckEvent, batched snapshots, insert/update/delete diffs, or exact resume control in your own event loop. - Use
liveQueryRowsWithSql(...)when your UI wants the latest materialized row list for an arbitrary SQL query. - Use
liveTableRows(...)whenSELECT * FROM namespace.tableis enough.
For most widget state and list rendering, prefer the materialized helpers. They remove the need to reconcile InitialDataBatch, InsertEvent, UpdateEvent, and DeleteEvent yourself.
Materialized live rows
liveQueryRowsWithSql<T>(...) keeps the current row set updated inside the SDK
and emits the latest list whenever the query result changes.
final rowsStream = client.liveQueryRowsWithSql<Map<String, KalamCellValue>>(
"SELECT id, room, body, created_at FROM app.messages WHERE room = 'main'",
lastRows: 20,
);
await for (final rows in rowsStream) {
for (final row in rows) {
print('${row['id']?.asInt()} ${row['body']?.asString()}');
}
}Use liveTableRows<T>(...) when SELECT * FROM table is enough:
final taskRows = client.liveTableRows<Map<String, KalamCellValue>>('app.tasks');
await for (final rows in taskRows) {
print('tasks: ${rows.length}');
}If your live query uses a natural or composite key instead of a plain id
column, pass keyColumns so reconciliation still stays inside the shared Rust
core:
final messages = client.liveQueryRowsWithSql<Map<String, KalamCellValue>>(
"SELECT room_id, message_id, body FROM app.messages WHERE room_id = 'main'",
keyColumns: ['room_id', 'message_id'],
);You can also cap the in-memory materialized row set with limit and map rows
directly into app models with mapRow.
Materialized rows vs change events
subscribe(...)gives you protocol-level events and incremental changes.liveQueryRowsWithSql(...)andliveTableRows(...)give you the fully reconciled current row list.- Materialized helpers are usually the better default for UI code.
- Low-level events are better when you need custom reconciliation, audit logging, or protocol inspection.
Materialized-row limits
The SQL for materialized live rows still follows the same strict supported shape:
SELECT ... FROM ... WHERE ...- no
ORDER BY - no
LIMIT
If you need presentation sorting, grouping, or capping, do that after rows arrive in Dart.
Concurrency notes
The Dart SDK test suite covers multi-client fan-out and many simultaneous subscriptions on one shared client connection. That means the supported path is not just single-subscriber demos: concurrent writers and multiple active listeners on the same query are part of the exercised contract.
Basic subscription
final stream = client.subscribe(
"SELECT * FROM app.messages WHERE room = 'main'",
);
await for (final event in stream) {
switch (event) {
case AckEvent(:final subscriptionId, :final schema, :final totalRows):
print('ack $subscriptionId: $totalRows rows, columns=${schema.length}');
case InitialDataBatch(:final rows, :final hasMore):
print('snapshot batch: ${rows.length}, hasMore=$hasMore');
case InsertEvent(:final row):
print('insert: ${row['id']?.asInt()}');
case UpdateEvent(:final row, :final oldRow):
print('update: $oldRow -> $row');
case DeleteEvent(:final row):
print('delete: $row');
case SubscriptionError(:final code, :final message):
print('subscription error [$code]: $message');
}
}Snapshot vs live changes
Most subscriptions follow this pattern:
AckEventconfirms the subscription and includes schema metadata.- One or more
InitialDataBatchevents deliver the initial snapshot. InsertEvent/UpdateEvent/DeleteEventdeliver live changes.
If you only care about live changes, you can ignore InitialDataBatch after initial UI hydration.
Controlling the initial snapshot
batchSize
If the initial query returns many rows, the server can send it in batches. batchSize controls the maximum rows per snapshot batch.
final stream = client.subscribe(
'SELECT * FROM app.large_table',
batchSize: 100,
);You will receive one or more InitialDataBatch events with hasMore=true, followed by hasMore=false and then live changes.
lastRows
lastRows asks the server to “rewind” and include the last N rows before live changes begin. This is useful for chat timelines or activity feeds.
final stream = client.subscribe(
'SELECT * FROM chat.messages WHERE room_id = $1',
lastRows: 50,
);subscriptionId
Provide an explicit subscriptionId if you want stable IDs for client-side bookkeeping:
final stream = client.subscribe(
"SELECT * FROM app.messages WHERE room = 'main'",
subscriptionId: 'messages-feed',
);from
Use from to resume a subscription from a known sequence ID.
This is most useful when you:
- persist a checkpoint (last processed seq id) across app restarts
- want to re-subscribe without re-processing older events
final stream = client.subscribe(
"SELECT * FROM app.messages WHERE room = 'main'",
from: SeqId.parse('123'),
);To discover the latest seq_id seen by the client, use getSubscriptions() and read SubscriptionInfo.lastSeqId.
When you resume with from, the intended contract is strict resume:
- rows with
_seq <= frommust not be replayed - rows written while the connection was down must still arrive after reconnect
- new live rows after reconnect continue on the same stream
The Dart reconnect and resume test suite covers these cases with duplicate _seq
assertions so the SDK keeps a strict “no replay, no missed change” boundary.
Listing active subscriptions
Use getSubscriptions() to inspect all subscriptions on the shared connection.
final subs = await client.getSubscriptions();
for (final sub in subs) {
print('${sub.id}: ${sub.query} (lastSeqId=${sub.lastSeqId}, closed=${sub.closed})');
}For server-side observability while debugging, query SELECT * FROM system.live.
That view is node-local and reflects the subscriptions currently held in memory
on the node you query. It is not persisted to RocksDB and it is not replicated
through Raft.
Cancelling a subscription
The SDK returns a Stream<ChangeEvent>. Cancel by cancelling your StreamSubscription.
final sub = client.subscribe('SELECT * FROM app.messages').listen((_) {});
// later
await sub.cancel();Cancellation stays lightweight on the Dart side. The SDK cancels the Rust subscription handle on the shared connection, waits briefly for the pull loop to unblock, and then closes the stream. That avoids leaving a hanging Dart-side pull loop after a widget or provider disposes.
Data types
All subscription events expose rows as Map<String, KalamCellValue> for type-safe access:
InitialDataBatch.rows→List<Map<String, KalamCellValue>>InsertEvent.row/InsertEvent.rowsUpdateEvent.columnIndexes/UpdateEvent.row/UpdateEvent.oldRow/UpdateEvent.oldRowsDeleteEvent.row/DeleteEvent.oldRows
UPDATE event behavior
For UpdateEvent, row and rows are sparse typed maps reconstructed from the Ack schema.
They contain only the changed columns plus the primary key columns and _seq.
oldRow contains the previous values for that same sparse column set.
columnIndexes exposes the original positional update layout from the wire protocol. It lines up
with the values sent on the Rust bridge raw event if you need exact sparse-column ordering.
case UpdateEvent(:final row, :final oldRow):
// row has ALL non-null columns — no need to re-query
final id = row['id']?.asString();
final updateType = row['update_type']?.asString();
final targetId = row['target_id']?.asString();
print('Trigger: $updateType on $targetId');
// oldRow has only the changed columns
for (final key in oldRow.keys) {
if (!key.startsWith('_') && key != 'id') {
print(' changed: $key: ${oldRow[key]} → ${row[key]}');
}
}Use the typed accessors to extract values:
case InsertEvent(:final row):
final id = row['id']?.asInt();
final name = row['name']?.asString();
final score = row['score']?.asDouble();
final flag = row['active']?.asBool();These event accessors are already decoded from JSON row payloads; no manual List<List<dynamic>> parsing is needed.
See Cell Values for the full KalamCellValue API.