Skip to Content
SDK & ClientDart / Flutter SDKRealtime Subscriptions

Realtime Subscriptions

The Dart/Flutter SDK supports “live queries”: you subscribe to a SELECT statement and receive a stream of change events over WebSocket.

The important ownership boundary is:

  • Rust owns the shared connection, reconnect loop, replay filtering, and resume checkpoint tracking.
  • Dart owns only the stream wrapper and typed row decoding.

That keeps reconnect behavior consistent with other SDKs and avoids re-implementing replay logic in Flutter apps.

On native Dart targets, subscription traffic uses MessagePack by default on that shared Rust-managed connection, which trims wire size and decode overhead compared with JSON.

Live SQL must follow the strict supported shape: SELECT ... FROM ... WHERE .... Do not include ORDER BY or LIMIT in subscribe() or materialized live-query SQL.

Use:

Stream<ChangeEvent> subscribe( String sql, { int? batchSize, int? lastRows, SeqId? from, String? subscriptionId, })

For low-level event handling, subscribe(...) is the right API.

Unlike query(...), the current live-query APIs take the final SQL string directly and do not expose a separate params argument.

The Dart SDK also exposes materialized live-row helpers when you want the SDK to keep the current row set reconciled for you:

  • liveQueryRowsWithSql<T>(sql, ...)
  • liveTableRows<T>(tableName, ...)

Which API to use

Choose the API based on the shape your app actually needs:

  • Use subscribe(...) when you need low-level protocol events like AckEvent, batched snapshots, insert/update/delete diffs, or exact resume control in your own event loop.
  • Use liveQueryRowsWithSql(...) when your UI wants the latest materialized row list for an arbitrary SQL query.
  • Use liveTableRows(...) when SELECT * FROM namespace.table is enough.

For most widget state and list rendering, prefer the materialized helpers. They remove the need to reconcile InitialDataBatch, InsertEvent, UpdateEvent, and DeleteEvent yourself.

Materialized live rows

liveQueryRowsWithSql<T>(...) keeps the current row set updated inside the SDK and emits the latest list whenever the query result changes.

final rowsStream = client.liveQueryRowsWithSql<Map<String, KalamCellValue>>( "SELECT id, room, body, created_at FROM app.messages WHERE room = 'main'", lastRows: 20, ); await for (final rows in rowsStream) { for (final row in rows) { print('${row['id']?.asInt()} ${row['body']?.asString()}'); } }

Use liveTableRows<T>(...) when SELECT * FROM table is enough:

final taskRows = client.liveTableRows<Map<String, KalamCellValue>>('app.tasks'); await for (final rows in taskRows) { print('tasks: ${rows.length}'); }

If your live query uses a natural or composite key instead of a plain id column, pass keyColumns so reconciliation still stays inside the shared Rust core:

final messages = client.liveQueryRowsWithSql<Map<String, KalamCellValue>>( "SELECT room_id, message_id, body FROM app.messages WHERE room_id = 'main'", keyColumns: ['room_id', 'message_id'], );

You can also cap the in-memory materialized row set with limit and map rows directly into app models with mapRow.

Materialized rows vs change events

  • subscribe(...) gives you protocol-level events and incremental changes.
  • liveQueryRowsWithSql(...) and liveTableRows(...) give you the fully reconciled current row list.
  • Materialized helpers are usually the better default for UI code.
  • Low-level events are better when you need custom reconciliation, audit logging, or protocol inspection.

Materialized-row limits

The SQL for materialized live rows still follows the same strict supported shape:

  • SELECT ... FROM ... WHERE ...
  • no ORDER BY
  • no LIMIT

If you need presentation sorting, grouping, or capping, do that after rows arrive in Dart.

Concurrency notes

The Dart SDK test suite covers multi-client fan-out and many simultaneous subscriptions on one shared client connection. That means the supported path is not just single-subscriber demos: concurrent writers and multiple active listeners on the same query are part of the exercised contract.

Basic subscription

final stream = client.subscribe( "SELECT * FROM app.messages WHERE room = 'main'", ); await for (final event in stream) { switch (event) { case AckEvent(:final subscriptionId, :final schema, :final totalRows): print('ack $subscriptionId: $totalRows rows, columns=${schema.length}'); case InitialDataBatch(:final rows, :final hasMore): print('snapshot batch: ${rows.length}, hasMore=$hasMore'); case InsertEvent(:final row): print('insert: ${row['id']?.asInt()}'); case UpdateEvent(:final row, :final oldRow): print('update: $oldRow -> $row'); case DeleteEvent(:final row): print('delete: $row'); case SubscriptionError(:final code, :final message): print('subscription error [$code]: $message'); } }

Snapshot vs live changes

Most subscriptions follow this pattern:

  1. AckEvent confirms the subscription and includes schema metadata.
  2. One or more InitialDataBatch events deliver the initial snapshot.
  3. InsertEvent / UpdateEvent / DeleteEvent deliver live changes.

If you only care about live changes, you can ignore InitialDataBatch after initial UI hydration.

Controlling the initial snapshot

batchSize

If the initial query returns many rows, the server can send it in batches. batchSize controls the maximum rows per snapshot batch.

final stream = client.subscribe( 'SELECT * FROM app.large_table', batchSize: 100, );

You will receive one or more InitialDataBatch events with hasMore=true, followed by hasMore=false and then live changes.

lastRows

lastRows asks the server to “rewind” and include the last N rows before live changes begin. This is useful for chat timelines or activity feeds.

final stream = client.subscribe( 'SELECT * FROM chat.messages WHERE room_id = $1', lastRows: 50, );

subscriptionId

Provide an explicit subscriptionId if you want stable IDs for client-side bookkeeping:

final stream = client.subscribe( "SELECT * FROM app.messages WHERE room = 'main'", subscriptionId: 'messages-feed', );

from

Use from to resume a subscription from a known sequence ID.

This is most useful when you:

  • persist a checkpoint (last processed seq id) across app restarts
  • want to re-subscribe without re-processing older events
final stream = client.subscribe( "SELECT * FROM app.messages WHERE room = 'main'", from: SeqId.parse('123'), );

To discover the latest seq_id seen by the client, use getSubscriptions() and read SubscriptionInfo.lastSeqId.

When you resume with from, the intended contract is strict resume:

  • rows with _seq <= from must not be replayed
  • rows written while the connection was down must still arrive after reconnect
  • new live rows after reconnect continue on the same stream

The Dart reconnect and resume test suite covers these cases with duplicate _seq assertions so the SDK keeps a strict “no replay, no missed change” boundary.

Listing active subscriptions

Use getSubscriptions() to inspect all subscriptions on the shared connection.

final subs = await client.getSubscriptions(); for (final sub in subs) { print('${sub.id}: ${sub.query} (lastSeqId=${sub.lastSeqId}, closed=${sub.closed})'); }

For server-side observability while debugging, query SELECT * FROM system.live. That view is node-local and reflects the subscriptions currently held in memory on the node you query. It is not persisted to RocksDB and it is not replicated through Raft.

Cancelling a subscription

The SDK returns a Stream<ChangeEvent>. Cancel by cancelling your StreamSubscription.

final sub = client.subscribe('SELECT * FROM app.messages').listen((_) {}); // later await sub.cancel();

Cancellation stays lightweight on the Dart side. The SDK cancels the Rust subscription handle on the shared connection, waits briefly for the pull loop to unblock, and then closes the stream. That avoids leaving a hanging Dart-side pull loop after a widget or provider disposes.

Data types

All subscription events expose rows as Map<String, KalamCellValue> for type-safe access:

  • InitialDataBatch.rowsList<Map<String, KalamCellValue>>
  • InsertEvent.row / InsertEvent.rows
  • UpdateEvent.columnIndexes / UpdateEvent.row / UpdateEvent.oldRow / UpdateEvent.oldRows
  • DeleteEvent.row / DeleteEvent.oldRows

UPDATE event behavior

For UpdateEvent, row and rows are sparse typed maps reconstructed from the Ack schema. They contain only the changed columns plus the primary key columns and _seq.

oldRow contains the previous values for that same sparse column set.

columnIndexes exposes the original positional update layout from the wire protocol. It lines up with the values sent on the Rust bridge raw event if you need exact sparse-column ordering.

case UpdateEvent(:final row, :final oldRow): // row has ALL non-null columns — no need to re-query final id = row['id']?.asString(); final updateType = row['update_type']?.asString(); final targetId = row['target_id']?.asString(); print('Trigger: $updateType on $targetId'); // oldRow has only the changed columns for (final key in oldRow.keys) { if (!key.startsWith('_') && key != 'id') { print(' changed: $key: ${oldRow[key]}${row[key]}'); } }

Use the typed accessors to extract values:

case InsertEvent(:final row): final id = row['id']?.asInt(); final name = row['name']?.asString(); final score = row['score']?.asDouble(); final flag = row['active']?.asBool();

These event accessors are already decoded from JSON row payloads; no manual List<List<dynamic>> parsing is needed.

See Cell Values for the full KalamCellValue API.

Next

Last updated on