Skip to Content
Topic Pub/SubSQL Topics & Consumers

SQL Topics & Consumer Groups

Use SQL to create topics, attach source tables, inspect offsets, and do simple topic reads inside KalamDB.

For production workers, the HTTP or SDK consume APIs are still a better fit when you need explicit ack timing and partition selection. The SQL surface is best for setup, inspection, debugging, and simple single-partition flows. SQL consumer groups resume from persisted offsets too, and grouped SQL consumes require an explicit ACK after processing.

Why use groups

Use GROUP when a worker should keep its place in a topic.

Without GROUP, a consume is stateless. Every call starts from the FROM position you requested, so it is good for inspection and replay but not for a durable worker cursor.

With GROUP, KalamDB stores progress in system.topic_offsets for (topic_id, group_id, partition_id). That gives you:

  • resume-after-restart behavior for the same logical worker or worker fleet
  • independent read positions for different consumers on the same topic
  • a way to replay with one group while production workers keep their own offsets

There is no separate CREATE GROUP command. A group is created the first time you consume or acknowledge offsets with a new group_id.

Stateless reads vs durable reads

Use no group when you want a pure viewer:

sql snippetSQL
CONSUME FROM demo.message_events FROM EARLIEST LIMIT 100;

That query does not store progress. Run it again and you can replay the same range again.

Use a group when you want a named cursor:

sql snippetSQL
CONSUME FROM demo.message_eventsGROUP 'ui-debug-admin'FROM LATESTLIMIT 100;

That query creates or resumes the ui-debug-admin cursor for partition 0.

Use a single-partition topic until multi-partition behavior is fully finished:

sql snippetSQL
CREATE TOPIC app.user_events PARTITIONS 1;

CREATE TOPIC requirements today:

  • topic names must be namespace-qualified, like app.user_events
  • the namespace must already exist
  • CREATE TOPIC IF NOT EXISTS is not currently supported
  • topic creation requires dba or system role

CREATE TOPIC

sql snippetSQL
CREATE TOPIC <topic_name>[PARTITIONS <count>][WITH (retention_seconds = <seconds|NULL>, retention_max_bytes = <bytes|NULL>)];

When a topic is created, KalamDB persists it in system.topics and adds it to the in-memory route cache immediately.

Retention options on CREATE TOPIC are optional:

  • omit an option to use the server [topics] default
  • set an option to NULL to disable that limit for this topic
  • retention_max_bytes is enforced per partition, not across the whole topic

Examples:

sql snippetSQL
CREATE TOPIC app.user_events PARTITIONS 1; CREATE TOPIC app.user_eventsPARTITIONS 1WITH (retention_seconds = 86400, retention_max_bytes = 268435456); CREATE TOPIC app.audit_eventsPARTITIONS 1WITH (retention_seconds = 3600, retention_max_bytes = NULL);

In the last example, messages are deleted once they are older than one hour, and there is no byte cap for the topic partitions.

Topic retention

Topic retention is a topic-level policy stored in system.topics. It is independent of consumer group ack state.

Current behavior:

  • retention_seconds deletes messages older than the configured age
  • retention_max_bytes trims the oldest retained messages until the partition is back under the byte cap
  • if both limits are active, age-based pruning runs first and byte-based pruning runs after that
  • if both limits are NULL, retention is disabled for the topic
  • retained message deletion does not rewrite offsets or move consumer group cursors

The default server configuration is 7 days and 1 GiB per partition unless you override it in the [topics] config block:

toml snippetTOML
[topics]default_retention_seconds = 604800default_retention_max_bytes = 1073741824retention_check_interval_seconds = 3600retention_batch_size = 10000

ALTER TOPIC retention

Use ALTER TOPIC when you want to change retention after the topic already exists.

sql snippetSQL
ALTER TOPIC <topic_name>SET RETENTION WITH (retention_seconds = <seconds|NULL>, retention_max_bytes = <bytes|NULL>); ALTER TOPIC <topic_name> CLEAR RETENTION;

SET RETENTION only changes the options you specify. Omitted options keep their current value.

Examples:

sql snippetSQL
ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_seconds = 604800, retention_max_bytes = 1073741824); ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_seconds = NULL); ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_max_bytes = 536870912); ALTER TOPIC app.user_events CLEAR RETENTION;

Use CLEAR RETENTION when you want to disable both limits in one step.

How retention affects reads

Retention removes stored messages, but it does not renumber topic offsets.

  • CONSUME FROM ... FROM EARLIEST starts at the earliest currently retained offset
  • explicit offsets below that low watermark fail because those messages are no longer available
  • a consumer group with a committed offset below the earliest retained offset must be reset before it can continue

Example recovery flow for a lagged group:

sql snippetSQL
RESET CONSUMER GROUP 'backfill-worker' ON app.user_events TO 250; CONSUME FROM app.user_eventsGROUP 'backfill-worker'FROM EARLIESTLIMIT 100;

That reset changes the next offset for the group. The FROM clause in later grouped reads does not override an already-committed group cursor.

ADD SOURCE

sql snippetSQL
ALTER TOPIC <topic_name>ADD SOURCE <table_name_or_namespace.table_name>ON <INSERT|UPDATE|DELETE>[WHERE <filter_expression>][WITH (payload = '<key|full|diff>')];

The parser accepts key, full, and diff, but only full is supported today. key and diff are still in development.

Current route behavior:

  • a route matches by source table and operation
  • one changed row becomes one topic message
  • route registration updates system.topics and the in-memory route cache immediately
  • ALTER TOPIC ... ADD SOURCE requires dba or system role

You can add separate routes for INSERT, UPDATE, and DELETE:

sql snippetSQL
ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WITH (payload = 'full');ALTER TOPIC app.user_events ADD SOURCE app.users ON UPDATE WITH (payload = 'full');ALTER TOPIC app.user_events ADD SOURCE app.users ON DELETE WITH (payload = 'full');

Payload modes

Only payload = 'full' has behavior you should rely on today. key and diff are still in development:

sql snippetSQL
ALTER TOPIC app.user_eventsADD SOURCE app.usersON INSERTWITH (payload = 'full');

What those modes mean in the current code:

ModeCurrent behavior
keyIn development. The parser accepts it, but current publishing code still serializes row JSON without a stable primary-key-only payload shape.
fullPublishes the row JSON and injects _table with the source table id.
diffIn development. The parser accepts it, but current publishing code still publishes the same full-row payload as full.

If you want predictable payloads today, use payload = 'full'. Treat key and diff as in development.

WHERE filters

This syntax is accepted:

sql snippetSQL
ALTER TOPIC app.user_eventsADD SOURCE app.usersON INSERTWHERE is_active = trueWITH (payload = 'full');

The filter expression is stored in topic metadata, but the current publisher path does not evaluate it yet. Do not rely on WHERE to suppress messages today.

CONSUME FROM

sql snippetSQL
CONSUME FROM <topic_name>[GROUP '<group_id>'][FROM <LATEST|EARLIEST|offset>][LIMIT <count>];

Examples:

sql snippetSQL
CONSUME FROM app.user_events FROM EARLIEST LIMIT 10; CONSUME FROM app.user_eventsGROUP 'ai-worker'FROM EARLIESTLIMIT 10;

Current SQL consume behavior:

  • SQL CONSUME reads partition 0 only
  • the result columns are topic_id, partition_id, offset, key, payload, timestamp_ms, user_id, and op
  • payload is a binary column containing the stored message bytes
  • user_id is the canonical producing user id when the message came from a USER-table scoped write
  • without GROUP, the read is stateless
  • with GROUP, KalamDB first checks system.topic_offsets and resumes from the committed offset when one exists
  • if a group has no committed offset yet, the FROM clause decides the initial position
  • with GROUP, the returned range is claimed in memory until it is acknowledged or the visibility timeout expires
  • use ACK to persist progress after processing a grouped batch

If you need explicit ack timing or partition selection, use the HTTP Topic API.

New group vs existing group

The important rule is:

  • for a brand-new group, FROM decides where the first read starts
  • for a group that already has a committed offset, KalamDB resumes from that offset and ignores FROM

Examples:

New group that tails only future messages:

sql snippetSQL
CONSUME FROM demo.message_eventsGROUP 'ui-debug-admin'FROM LATESTLIMIT 100;

Use that when you create a fresh debug group and only want messages written after the group starts.

New group that replays the backlog:

sql snippetSQL
CONSUME FROM demo.message_eventsGROUP 'backfill-worker'FROM EARLIESTLIMIT 100;

Use that when a new worker should process existing messages from the beginning.

Existing group that already consumed part of the topic:

sql snippetSQL
CONSUME FROM demo.message_eventsGROUP 'backfill-worker'FROM LATESTLIMIT 100;

Even though this query says FROM LATEST, KalamDB resumes backfill-worker from its committed offset. The group already has a cursor, so the FROM clause no longer changes the starting point.

If you created a new group on the fly and expected to see existing messages, do not use FROM LATEST. Use FROM EARLIEST or FROM <offset> for that first consume.

How to use groups safely

Use the same group id for one logical worker fleet:

sql snippetSQL
CONSUME FROM app.user_events GROUP 'email-worker' FROM EARLIEST LIMIT 50;

Use a different group when you want an independent cursor:

sql snippetSQL
CONSUME FROM app.user_events GROUP 'audit-replay' FROM EARLIEST LIMIT 50;

Use no group when you want a viewer that never advances offsets:

sql snippetSQL
CONSUME FROM app.user_events FROM EARLIEST LIMIT 50;

Use a throwaway group for one-off debugging when you want group semantics without affecting a real worker group:

sql snippetSQL
CONSUME FROM app.user_eventsGROUP 'debug-2026-04-24-admin'FROM EARLIESTLIMIT 50;

That lets you inspect with a separate durable cursor while production groups keep their own positions.

ACK

sql snippetSQL
ACK <topic_name>GROUP '<group_id>'[PARTITION <partition_id>]UPTO OFFSET <offset>;

Current ack behavior:

  • ACK writes to system.topic_offsets
  • acknowledgements are monotonic, so a lower offset does not move a group backward
  • ACK is per topic, group, and partition
  • ACK requires service, dba, or system role

If you are using SQL CONSUME ... GROUP, issue ACK after you finish processing the returned rows. If the caller does not ACK before the topic visibility timeout, the range can be delivered again.

RESET CONSUMER GROUP

sql snippetSQL
RESET CONSUMER GROUP '<group_id>'ON <topic_name>[PARTITION <partition_id>]TO <next_offset>;

Use reset when you deliberately want to move a durable group cursor. This is an admin-only operation and affects only the named topic, group, and partition.

Replay a debug group from the beginning:

sql snippetSQL
RESET CONSUMER GROUP 'ui-debug-admin1' ON blog.summarizer TO 0; CONSUME FROM blog.summarizerGROUP 'ui-debug-admin1'FROM EARLIESTLIMIT 100;

Move a group so the next read starts at offset 250:

sql snippetSQL
RESET CONSUMER GROUP 'backfill-worker' ON app.user_events PARTITION 0 TO 250;

For inspection without moving or resetting any group, omit GROUP:

sql snippetSQL
CONSUME FROM blog.summarizer FROM 0 LIMIT 100;

CLEAR TOPIC

sql snippetSQL
CLEAR TOPIC <topic_name>;

CLEAR TOPIC keeps the topic definition but schedules a background cleanup job to delete stored messages and consumer offsets.

DROP TOPIC

sql snippetSQL
DROP TOPIC <topic_name>;

DROP TOPIC removes the topic definition and route cache entry immediately, then schedules the same background cleanup job for stored messages and offsets.

Inspect topic state

Topic metadata and consumer progress are queryable:

sql snippetSQL
SELECT topic_id, partitions, routes, created_at, updated_atFROM system.topicsORDER BY topic_id; SELECT topic_id, group_id, partition_id, last_acked_offset, updated_atFROM system.topic_offsetsORDER BY topic_id, group_id, partition_id;

The message log itself is internal. You inspect messages with CONSUME, not by querying a system.topic_messages table.

Practical guidance

  • use PARTITIONS 1
  • use payload = 'full'; it is the only payload mode with fully documented behavior today
  • use a separate source table and result table for agent flows so you do not create loops
  • treat key and diff as still in development, and treat route WHERE and custom partitioning as incomplete for now
Last updated on