Skip to Content
Realtime Subscriptions

Realtime Subscriptions

The Dart/Flutter SDK supports live queries over WebSocket.

The important ownership boundary is:

  • Rust owns the shared connection, reconnect loop, replay filtering, and resume checkpoint tracking.
  • Dart owns only the stream wrapper and typed row decoding.

That keeps reconnect behavior consistent with other SDKs and avoids re-implementing replay logic in Flutter apps.

On native Dart targets, subscription traffic uses MessagePack by default on that shared Rust-managed connection, which trims wire size and decode overhead compared with JSON.

Live SQL must follow the strict supported shape: SELECT ... FROM ... WHERE .... Do not include ORDER BY or LIMIT in liveEvents() or materialized live-query SQL.

Use:

dart snippetDART
Stream<ChangeEvent> liveEvents(  String sql, {  int? batchSize,  int? lastRows,  SeqId? from,  String? subscriptionId,})

For low-level event handling, liveEvents(...) is the right API.

Unlike query(...), the current live-query APIs take the final SQL string directly and do not expose a separate params argument.

The Dart SDK also exposes materialized live-row helpers when you want the SDK to keep the current row set reconciled for you:

  • live<T>(sql, ...)
  • liveTable<T>(tableName, ...)

Which API to use

Choose the API based on the shape your app actually needs:

  • Use liveEvents(...) when you need low-level protocol events like AckEvent, batched snapshots, insert/update/delete diffs, or exact resume control in your own event loop.
  • Use live(...) when your UI wants the latest materialized row list for an arbitrary SQL query.
  • Use liveTable(...) when SELECT * FROM namespace.table is enough.

For most widget state and list rendering, prefer the materialized helpers. They remove the need to reconcile InitialDataBatch, InsertEvent, UpdateEvent, and DeleteEvent yourself.

Materialized live rows

live<T>(...) keeps the current row set updated inside the SDK and emits the latest list whenever the query result changes.

dart snippetDART
final rowsStream = client.live<Map<String, KalamCellValue>>(  "SELECT id, room, body, created_at FROM app.messages WHERE room = 'main'",  lastRows: 20,  limit: 20,  onCheckpoint: (checkpoint) {    persistSeqId(checkpoint.lastSeqId.toString());  },); await for (final rows in rowsStream) {  for (final row in rows) {    print('${row['id']?.asInt()} ${row['body']?.asString()}');  }}

Use liveTable<T>(...) when SELECT * FROM table is enough:

dart snippetDART
final taskRows = client.liveTable<Map<String, KalamCellValue>>('app.tasks'); await for (final rows in taskRows) {  print('tasks: ${rows.length}');}

If your live query uses a natural or composite key instead of a plain id column, pass keyColumns so reconciliation still stays inside the shared Rust core:

dart snippetDART
final messages = client.live<Map<String, KalamCellValue>>(  "SELECT room_id, message_id, body FROM app.messages WHERE room_id = 'main'",  keyColumns: ['room_id', 'message_id'],);

You can also cap the in-memory materialized row set with limit, map rows directly into app models with mapRow, and persist a durable resume cursor with onCheckpoint.

Materialized rows vs change events

  • liveEvents(...) gives you protocol-level events and incremental changes.
  • live(...) and liveTable(...) give you the fully reconciled current row list.
  • Materialized helpers are usually the better default for UI code.
  • Low-level events are better when you need custom reconciliation, audit logging, or protocol inspection.

Materialized-row limits

The SQL for materialized live rows still follows the same strict supported shape:

  • SELECT ... FROM ... WHERE ...
  • no ORDER BY
  • no LIMIT

The controls apply at different stages:

  • batchSize chunks the initial snapshot sent by the server.
  • lastRows selects how much history to rewind before live changes begin.
  • limit caps the materialized live row set the SDK keeps after startup.

Keep Dart-side presentation logic for sorting and grouping after rows arrive. Use limit only when the materialized live state itself should stay bounded.

Concurrency notes

The Dart SDK test suite covers multi-client fan-out and many simultaneous subscriptions on one shared client connection. That means the supported path is not just single-subscriber demos: concurrent writers and multiple active listeners on the same query are part of the exercised contract.

Basic subscription

dart snippetDART
final stream = client.liveEvents(  "SELECT * FROM app.messages WHERE room = 'main'",); await for (final event in stream) {  switch (event) {    case AckEvent(:final subscriptionId, :final schema, :final totalRows):      print('ack $subscriptionId: $totalRows rows, columns=${schema.length}');    case InitialDataBatch(:final rows, :final hasMore):      print('snapshot batch: ${rows.length}, hasMore=$hasMore');    case InsertEvent(:final row):      print('insert: ${row['id']?.asInt()}');    case UpdateEvent(:final row, :final oldRow):      print('update: $oldRow -> $row');    case DeleteEvent(:final row):      print('delete: $row');    case SubscriptionError(:final code, :final message):      print('subscription error [$code]: $message');  }}

Snapshot vs live changes

Most subscriptions follow this pattern:

  1. AckEvent confirms the subscription and includes schema metadata.
  2. One or more InitialDataBatch events deliver the initial snapshot.
  3. InsertEvent / UpdateEvent / DeleteEvent deliver live changes.

If you only care about live changes, you can ignore InitialDataBatch after initial UI hydration.

Controlling the initial snapshot

batchSize

If the initial query returns many rows, the server can send it in batches. batchSize controls the maximum rows per snapshot batch.

dart snippetDART
final stream = client.liveEvents(  'SELECT * FROM app.large_table',  batchSize: 100,);

You will receive one or more InitialDataBatch events with hasMore=true, followed by hasMore=false and then live changes.

lastRows

lastRows asks the server to “rewind” and include the last N rows before live changes begin. This is useful for chat timelines or activity feeds.

dart snippetDART
final stream = client.liveEvents(  'SELECT * FROM chat.messages WHERE room_id = $1',  lastRows: 50,);

subscriptionId

Provide an explicit subscriptionId if you want stable IDs for client-side bookkeeping:

dart snippetDART
final stream = client.liveEvents(  "SELECT * FROM app.messages WHERE room = 'main'",  subscriptionId: 'messages-feed',);

from

Use from to resume a subscription from a known sequence ID.

from is the only resume cursor exposed by the Dart API. Commit ordering and snapshot boundaries stay on the server side.

This is most useful when you:

  • persist a checkpoint (last processed seq id) across app restarts
  • want to re-subscribe without re-processing older events
dart snippetDART
final stream = client.liveEvents(  "SELECT * FROM app.messages WHERE room = 'main'",  from: SeqId.parse('123'),);

For materialized live rows, prefer onCheckpoint so each applied snapshot gives you the latest SeqId directly. getSubscriptions() remains available when you need to inspect the whole shared connection state.

When you resume with from, the intended contract is strict resume:

  • rows with _seq <= from must not be replayed
  • rows written while the connection was down must still arrive after reconnect
  • new live rows after reconnect continue on the same stream

The Dart reconnect and resume test suite covers these cases with duplicate _seq assertions so the SDK keeps a strict “no replay, no missed change” boundary.

Listing active subscriptions

Use getSubscriptions() to inspect all subscriptions on the shared connection.

dart snippetDART
final subs = await client.getSubscriptions();for (final sub in subs) {  print('${sub.id}: ${sub.query} (lastSeqId=${sub.lastSeqId}, closed=${sub.closed})');}

For server-side observability while debugging, query SELECT * FROM system.live. That view is node-local and reflects the subscriptions currently held in memory on the node you query. It is not persisted to RocksDB and it is not replicated through Raft.

Cancelling a subscription

The SDK returns a Stream<ChangeEvent>. Cancel by cancelling your StreamSubscription.

dart snippetDART
final sub = client.liveEvents('SELECT * FROM app.messages').listen((_) {}); // laterawait sub.cancel();

Cancellation stays lightweight on the Dart side. The SDK cancels the Rust subscription handle on the shared connection, waits briefly for the pull loop to unblock, and then closes the stream. That avoids leaving a hanging Dart-side pull loop after a widget or provider disposes.

Data types

All subscription events expose rows as Map<String, KalamCellValue> for type-safe access:

  • InitialDataBatch.rowsList<Map<String, KalamCellValue>>
  • InsertEvent.row / InsertEvent.rows
  • UpdateEvent.columnIndexes / UpdateEvent.row / UpdateEvent.oldRow / UpdateEvent.oldRows
  • DeleteEvent.row / DeleteEvent.oldRows

UPDATE event behavior

For UpdateEvent, row and rows are sparse typed maps reconstructed from the Ack schema. They contain only the changed columns plus the primary key columns and _seq.

oldRow contains the previous values for that same sparse column set.

columnIndexes exposes the original positional update layout from the wire protocol. It lines up with the values sent on the Rust bridge raw event if you need exact sparse-column ordering.

dart snippetDART
case UpdateEvent(:final row, :final oldRow):  // row has ALL non-null columns — no need to re-query  final id = row['id']?.asString();  final updateType = row['update_type']?.asString();  final targetId = row['target_id']?.asString();  print('Trigger: $updateType on $targetId');   // oldRow has only the changed columns  for (final key in oldRow.keys) {    if (!key.startsWith('_') && key != 'id') {      print('  changed: $key: ${oldRow[key]} → ${row[key]}');    }  }

Use the typed accessors to extract values:

dart snippetDART
case InsertEvent(:final row):  final id    = row['id']?.asInt();  final name  = row['name']?.asString();  final score = row['score']?.asDouble();  final flag  = row['active']?.asBool();

These event accessors are already decoded from JSON row payloads; no manual List<List<dynamic>> parsing is needed.

See Cell Values for the full KalamCellValue API.

Next

Last updated on