Realtime Subscriptions
The Dart/Flutter SDK supports “live queries”: you subscribe to a SELECT statement and receive a stream of change events over WebSocket.
Use:
Stream<ChangeEvent> subscribe(
String sql, {
int? batchSize,
int? lastRows,
String? subscriptionId,
})Basic subscription
final stream = client.subscribe(
'SELECT * FROM app.messages ORDER BY created_at DESC LIMIT 50',
);
await for (final event in stream) {
switch (event) {
case AckEvent(:final subscriptionId, :final schema, :final totalRows):
print('ack $subscriptionId: $totalRows rows, columns=${schema.length}');
case InitialDataBatch(:final rows, :final hasMore):
print('snapshot batch: ${rows.length}, hasMore=$hasMore');
case InsertEvent(:final row):
print('insert: $row');
case UpdateEvent(:final row, :final oldRow):
print('update: $oldRow -> $row');
case DeleteEvent(:final row):
print('delete: $row');
case SubscriptionError(:final code, :final message):
print('subscription error [$code]: $message');
}
}Snapshot vs live changes
Most subscriptions follow this pattern:
AckEventconfirms the subscription and includes schema metadata.- One or more
InitialDataBatchevents deliver the initial snapshot. InsertEvent/UpdateEvent/DeleteEventdeliver live changes.
If you only care about live changes, you can ignore InitialDataBatch after initial UI hydration.
Controlling the initial snapshot
batchSize
If the initial query returns many rows, the server can send it in batches. batchSize controls the maximum rows per snapshot batch.
final stream = client.subscribe(
'SELECT * FROM app.large_table',
batchSize: 100,
);You will receive one or more InitialDataBatch events with hasMore=true, followed by hasMore=false and then live changes.
lastRows
lastRows asks the server to “rewind” and include the last N rows before live changes begin. This is useful for chat timelines or activity feeds.
final stream = client.subscribe(
'SELECT * FROM chat.messages WHERE room_id = $1 ORDER BY created_at ASC',
lastRows: 50,
);subscriptionId
Provide an explicit subscriptionId if you want stable IDs for client-side bookkeeping:
final stream = client.subscribe(
'SELECT * FROM app.messages ORDER BY created_at DESC LIMIT 50',
subscriptionId: 'messages-feed',
);Cancelling a subscription
The SDK returns a Stream<ChangeEvent>. Cancel by cancelling your StreamSubscription.
final sub = client.subscribe('SELECT * FROM app.messages').listen((_) {});
// later
await sub.cancel();Data types
Subscription rows arrive as JSON objects. Each event exposes convenient parsed maps:
InitialDataBatch.rows→List<Map<String, dynamic>>InsertEvent.row/InsertEvent.rowsUpdateEvent.row/UpdateEvent.oldRowDeleteEvent.row
If you need typed models, map from the returned Map<String, dynamic> to your own classes.
When using typed models, treat incoming values as untrusted input: validate required fields, handle nulls, and be tolerant to schema changes during development.