SQL Topics & Consumer Groups
Use SQL to create topics, attach source tables, inspect offsets, and do simple topic reads inside KalamDB.
For production workers, the HTTP or SDK consume APIs are still a better fit when you need explicit ack timing and partition selection. The SQL surface is best for setup, inspection, debugging, and simple single-partition flows. SQL consumer groups resume from persisted offsets too, and grouped SQL consumes require an explicit ACK after processing.
Why use groups
Use GROUP when a worker should keep its place in a topic.
Without GROUP, a consume is stateless. Every call starts from the FROM position you requested, so it is good for inspection and replay but not for a durable worker cursor.
With GROUP, KalamDB stores progress in system.topic_offsets for (topic_id, group_id, partition_id). That gives you:
- resume-after-restart behavior for the same logical worker or worker fleet
- independent read positions for different consumers on the same topic
- a way to replay with one group while production workers keep their own offsets
There is no separate CREATE GROUP command. A group is created the first time you consume or acknowledge offsets with a new group_id.
Stateless reads vs durable reads
Use no group when you want a pure viewer:
CONSUME FROM demo.message_events FROM EARLIEST LIMIT 100;That query does not store progress. Run it again and you can replay the same range again.
Use a group when you want a named cursor:
CONSUME FROM demo.message_eventsGROUP 'ui-debug-admin'FROM LATESTLIMIT 100;That query creates or resumes the ui-debug-admin cursor for partition 0.
Recommended starting point
Use a single-partition topic until multi-partition behavior is fully finished:
CREATE TOPIC app.user_events PARTITIONS 1;CREATE TOPIC requirements today:
- topic names must be namespace-qualified, like
app.user_events - the namespace must already exist
CREATE TOPIC IF NOT EXISTSis not currently supported- topic creation requires
dbaorsystemrole
CREATE TOPIC
CREATE TOPIC <topic_name>[PARTITIONS <count>][WITH (retention_seconds = <seconds|NULL>, retention_max_bytes = <bytes|NULL>)];When a topic is created, KalamDB persists it in system.topics and adds it to the in-memory route cache immediately.
Retention options on CREATE TOPIC are optional:
- omit an option to use the server
[topics]default - set an option to
NULLto disable that limit for this topic retention_max_bytesis enforced per partition, not across the whole topic
Examples:
CREATE TOPIC app.user_events PARTITIONS 1; CREATE TOPIC app.user_eventsPARTITIONS 1WITH (retention_seconds = 86400, retention_max_bytes = 268435456); CREATE TOPIC app.audit_eventsPARTITIONS 1WITH (retention_seconds = 3600, retention_max_bytes = NULL);In the last example, messages are deleted once they are older than one hour, and there is no byte cap for the topic partitions.
Topic retention
Topic retention is a topic-level policy stored in system.topics. It is independent of consumer group ack state.
Current behavior:
retention_secondsdeletes messages older than the configured ageretention_max_bytestrims the oldest retained messages until the partition is back under the byte cap- if both limits are active, age-based pruning runs first and byte-based pruning runs after that
- if both limits are
NULL, retention is disabled for the topic - retained message deletion does not rewrite offsets or move consumer group cursors
The default server configuration is 7 days and 1 GiB per partition unless you override it in the [topics] config block:
[topics]default_retention_seconds = 604800default_retention_max_bytes = 1073741824retention_check_interval_seconds = 3600retention_batch_size = 10000ALTER TOPIC retention
Use ALTER TOPIC when you want to change retention after the topic already exists.
ALTER TOPIC <topic_name>SET RETENTION WITH (retention_seconds = <seconds|NULL>, retention_max_bytes = <bytes|NULL>); ALTER TOPIC <topic_name> CLEAR RETENTION;SET RETENTION only changes the options you specify. Omitted options keep their current value.
Examples:
ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_seconds = 604800, retention_max_bytes = 1073741824); ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_seconds = NULL); ALTER TOPIC app.user_eventsSET RETENTION WITH (retention_max_bytes = 536870912); ALTER TOPIC app.user_events CLEAR RETENTION;Use CLEAR RETENTION when you want to disable both limits in one step.
How retention affects reads
Retention removes stored messages, but it does not renumber topic offsets.
CONSUME FROM ... FROM EARLIESTstarts at the earliest currently retained offset- explicit offsets below that low watermark fail because those messages are no longer available
- a consumer group with a committed offset below the earliest retained offset must be reset before it can continue
Example recovery flow for a lagged group:
RESET CONSUMER GROUP 'backfill-worker' ON app.user_events TO 250; CONSUME FROM app.user_eventsGROUP 'backfill-worker'FROM EARLIESTLIMIT 100;That reset changes the next offset for the group. The FROM clause in later grouped reads does not override an already-committed group cursor.
ADD SOURCE
ALTER TOPIC <topic_name>ADD SOURCE <table_name_or_namespace.table_name>ON <INSERT|UPDATE|DELETE>[WHERE <filter_expression>][WITH (payload = '<key|full|diff>')];The parser accepts key, full, and diff, but only full is supported today. key and diff are still in development.
Current route behavior:
- a route matches by source table and operation
- one changed row becomes one topic message
- route registration updates
system.topicsand the in-memory route cache immediately ALTER TOPIC ... ADD SOURCErequiresdbaorsystemrole
You can add separate routes for INSERT, UPDATE, and DELETE:
ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WITH (payload = 'full');ALTER TOPIC app.user_events ADD SOURCE app.users ON UPDATE WITH (payload = 'full');ALTER TOPIC app.user_events ADD SOURCE app.users ON DELETE WITH (payload = 'full');Payload modes
Only payload = 'full' has behavior you should rely on today. key and diff are still in development:
ALTER TOPIC app.user_eventsADD SOURCE app.usersON INSERTWITH (payload = 'full');What those modes mean in the current code:
| Mode | Current behavior |
|---|---|
key | In development. The parser accepts it, but current publishing code still serializes row JSON without a stable primary-key-only payload shape. |
full | Publishes the row JSON and injects _table with the source table id. |
diff | In development. The parser accepts it, but current publishing code still publishes the same full-row payload as full. |
If you want predictable payloads today, use payload = 'full'. Treat key and diff as in development.
WHERE filters
This syntax is accepted:
ALTER TOPIC app.user_eventsADD SOURCE app.usersON INSERTWHERE is_active = trueWITH (payload = 'full');The filter expression is stored in topic metadata, but the current publisher path does not evaluate it yet. Do not rely on WHERE to suppress messages today.
CONSUME FROM
CONSUME FROM <topic_name>[GROUP '<group_id>'][FROM <LATEST|EARLIEST|offset>][LIMIT <count>];Examples:
CONSUME FROM app.user_events FROM EARLIEST LIMIT 10; CONSUME FROM app.user_eventsGROUP 'ai-worker'FROM EARLIESTLIMIT 10;Current SQL consume behavior:
- SQL
CONSUMEreads partition0only - the result columns are
topic_id,partition_id,offset,key,payload,timestamp_ms,user_id, andop payloadis a binary column containing the stored message bytesuser_idis the canonical producing user id when the message came from a USER-table scoped write- without
GROUP, the read is stateless - with
GROUP, KalamDB first checkssystem.topic_offsetsand resumes from the committed offset when one exists - if a group has no committed offset yet, the
FROMclause decides the initial position - with
GROUP, the returned range is claimed in memory until it is acknowledged or the visibility timeout expires - use
ACKto persist progress after processing a grouped batch
If you need explicit ack timing or partition selection, use the HTTP Topic API.
New group vs existing group
The important rule is:
- for a brand-new group,
FROMdecides where the first read starts - for a group that already has a committed offset, KalamDB resumes from that offset and ignores
FROM
Examples:
New group that tails only future messages:
CONSUME FROM demo.message_eventsGROUP 'ui-debug-admin'FROM LATESTLIMIT 100;Use that when you create a fresh debug group and only want messages written after the group starts.
New group that replays the backlog:
CONSUME FROM demo.message_eventsGROUP 'backfill-worker'FROM EARLIESTLIMIT 100;Use that when a new worker should process existing messages from the beginning.
Existing group that already consumed part of the topic:
CONSUME FROM demo.message_eventsGROUP 'backfill-worker'FROM LATESTLIMIT 100;Even though this query says FROM LATEST, KalamDB resumes backfill-worker from its committed offset. The group already has a cursor, so the FROM clause no longer changes the starting point.
If you created a new group on the fly and expected to see existing messages, do not use FROM LATEST. Use FROM EARLIEST or FROM <offset> for that first consume.
How to use groups safely
Use the same group id for one logical worker fleet:
CONSUME FROM app.user_events GROUP 'email-worker' FROM EARLIEST LIMIT 50;Use a different group when you want an independent cursor:
CONSUME FROM app.user_events GROUP 'audit-replay' FROM EARLIEST LIMIT 50;Use no group when you want a viewer that never advances offsets:
CONSUME FROM app.user_events FROM EARLIEST LIMIT 50;Use a throwaway group for one-off debugging when you want group semantics without affecting a real worker group:
CONSUME FROM app.user_eventsGROUP 'debug-2026-04-24-admin'FROM EARLIESTLIMIT 50;That lets you inspect with a separate durable cursor while production groups keep their own positions.
ACK
ACK <topic_name>GROUP '<group_id>'[PARTITION <partition_id>]UPTO OFFSET <offset>;Current ack behavior:
ACKwrites tosystem.topic_offsets- acknowledgements are monotonic, so a lower offset does not move a group backward
ACKis per topic, group, and partitionACKrequiresservice,dba, orsystemrole
If you are using SQL CONSUME ... GROUP, issue ACK after you finish processing the returned rows. If the caller does not ACK before the topic visibility timeout, the range can be delivered again.
RESET CONSUMER GROUP
RESET CONSUMER GROUP '<group_id>'ON <topic_name>[PARTITION <partition_id>]TO <next_offset>;Use reset when you deliberately want to move a durable group cursor. This is an admin-only operation and affects only the named topic, group, and partition.
Replay a debug group from the beginning:
RESET CONSUMER GROUP 'ui-debug-admin1' ON blog.summarizer TO 0; CONSUME FROM blog.summarizerGROUP 'ui-debug-admin1'FROM EARLIESTLIMIT 100;Move a group so the next read starts at offset 250:
RESET CONSUMER GROUP 'backfill-worker' ON app.user_events PARTITION 0 TO 250;For inspection without moving or resetting any group, omit GROUP:
CONSUME FROM blog.summarizer FROM 0 LIMIT 100;CLEAR TOPIC
CLEAR TOPIC <topic_name>;CLEAR TOPIC keeps the topic definition but schedules a background cleanup job to delete stored messages and consumer offsets.
DROP TOPIC
DROP TOPIC <topic_name>;DROP TOPIC removes the topic definition and route cache entry immediately, then schedules the same background cleanup job for stored messages and offsets.
Inspect topic state
Topic metadata and consumer progress are queryable:
SELECT topic_id, partitions, routes, created_at, updated_atFROM system.topicsORDER BY topic_id; SELECT topic_id, group_id, partition_id, last_acked_offset, updated_atFROM system.topic_offsetsORDER BY topic_id, group_id, partition_id;The message log itself is internal. You inspect messages with CONSUME, not by querying a system.topic_messages table.
Practical guidance
- use
PARTITIONS 1 - use
payload = 'full'; it is the only payload mode with fully documented behavior today - use a separate source table and result table for agent flows so you do not create loops
- treat
keyanddiffas still in development, and treat routeWHEREand custom partitioning as incomplete for now