SQL Topics & Consumer Groups
Use SQL to create topics, attach source tables, inspect offsets, and do simple topic reads inside KalamDB.
For production workers, the HTTP or SDK consume APIs are still a better fit when you need explicit ack timing and partition selection. The SQL surface is best for setup, inspection, debugging, and simple single-partition flows. SQL consumer groups resume from persisted offsets too, and grouped SQL consumes require an explicit ACK after processing.
Why use groups
Use GROUP when a worker should keep its place in a topic.
Without GROUP, a consume is stateless. Every call starts from the FROM position you requested, so it is good for inspection and replay but not for a durable worker cursor.
With GROUP, KalamDB stores progress in system.topic_offsets for (topic_id, group_id, partition_id). That gives you:
- resume-after-restart behavior for the same logical worker or worker fleet
- independent read positions for different consumers on the same topic
- a way to replay with one group while production workers keep their own offsets
There is no separate CREATE GROUP command. A group is created the first time you consume or acknowledge offsets with a new group_id.
Stateless reads vs durable reads
Use no group when you want a pure viewer:
That query does not store progress. Run it again and you can replay the same range again.
Use a group when you want a named cursor:
That query creates or resumes the ui-debug-admin cursor for partition 0.
Recommended starting point
Use a single-partition topic until multi-partition behavior is fully finished:
CREATE TOPIC requirements today:
- topic names must be namespace-qualified, like
app.user_events - the namespace must already exist
CREATE TOPIC IF NOT EXISTSis not currently supported- topic creation requires
dbaorsystemrole
CREATE TOPIC
When a topic is created, KalamDB persists it in system.topics and adds it to the in-memory route cache immediately.
Retention options on CREATE TOPIC are optional:
- omit an option to use the server
[topics]default - set an option to
NULLto disable that limit for this topic retention_max_bytesis enforced per partition, not across the whole topic
Examples:
In the last example, messages are deleted once they are older than one hour, and there is no byte cap for the topic partitions.
Topic retention
Topic retention is a topic-level policy stored in system.topics. It is independent of consumer group ack state.
Current behavior:
retention_secondsdeletes messages older than the configured ageretention_max_bytestrims the oldest retained messages until the partition is back under the byte cap- if both limits are active, age-based pruning runs first and byte-based pruning runs after that
- if both limits are
NULL, retention is disabled for the topic - retained message deletion does not rewrite offsets or move consumer group cursors
The default server configuration is 7 days and 1 GiB per partition unless you override it in the [topics] config block:
ALTER TOPIC retention
Use ALTER TOPIC when you want to change retention after the topic already exists.
SET RETENTION only changes the options you specify. Omitted options keep their current value.
Examples:
Use CLEAR RETENTION when you want to disable both limits in one step.
How retention affects reads
Retention removes stored messages, but it does not renumber topic offsets.
CONSUME FROM ... FROM EARLIESTstarts at the earliest currently retained offset- explicit offsets below that low watermark fail because those messages are no longer available
- a consumer group with a committed offset below the earliest retained offset must be reset before it can continue
Example recovery flow for a lagged group:
That reset changes the next offset for the group. The FROM clause in later grouped reads does not override an already-committed group cursor.
ADD SOURCE
The parser accepts key, full, and diff, but only full is supported today. key and diff are still in development.
Current route behavior:
- a route matches by source table and operation
- an optional row-local
WHEREpredicate is evaluated before a message is published - one changed row becomes one topic message
- route registration updates
system.topicsand the in-memory route cache immediately ALTER TOPIC ... ADD SOURCErequiresdbaorsystemrole
You can add separate routes for INSERT, UPDATE, and DELETE:
Payload modes
Only payload = 'full' has behavior you should rely on today. key and diff are still in development:
What those modes mean in the current code:
| Mode | Current behavior |
|---|---|
key | In development. The parser accepts it, but current publishing code still serializes row JSON without a stable primary-key-only payload shape. |
full | Publishes the row JSON and injects _table with the source table id. |
diff | In development. The parser accepts it, but current publishing code still publishes the same full-row payload as full. |
If you want predictable payloads today, use payload = 'full'. Treat key and diff as in development.
WHERE filters
Use WHERE when only some row changes should publish to the topic. The predicate is evaluated against the row routed for the selected operation.
That is useful for worker queues that should only wake up on a subset of writes. For example, route only cancelled tasks into a cleanup worker topic:
With those two routes:
- a task inserted with
cancelled = truepublishes immediately - a task that starts active and is later updated to
cancelled = truepublishes on that update - rows where
cancelled = falseare ignored by these routes
Keep the predicate row-local. Subqueries are rejected for route filters.
CONSUME FROM
Examples:
Current SQL consume behavior:
- SQL
CONSUMEreads partition0only - the result columns are
topic_id,partition_id,offset,key,payload,timestamp_ms,user_id, andop payloadis a binary column containing the stored message bytesuser_idis the canonical producing user id when the message came from a USER-table scoped write- without
GROUP, the read is stateless - with
GROUP, KalamDB first checkssystem.topic_offsetsand resumes from the committed offset when one exists - if a group has no committed offset yet, the
FROMclause decides the initial position - with
GROUP, the returned range is claimed in memory until it is acknowledged or the visibility timeout expires - use
ACKto persist progress after processing a grouped batch
If you need explicit ack timing or partition selection, use the HTTP Topic API.
New group vs existing group
The important rule is:
- for a brand-new group,
FROMdecides where the first read starts - for a group that already has a committed offset, KalamDB resumes from that offset and ignores
FROM
Examples:
New group that tails only future messages:
Use that when you create a fresh debug group and only want messages written after the group starts.
New group that replays the backlog:
Use that when a new worker should process existing messages from the beginning.
Existing group that already consumed part of the topic:
Even though this query says FROM LATEST, KalamDB resumes backfill-worker from its committed offset. The group already has a cursor, so the FROM clause no longer changes the starting point.
If you created a new group on the fly and expected to see existing messages, do not use FROM LATEST. Use FROM EARLIEST or FROM <offset> for that first consume.
How to use groups safely
Use the same group id for one logical worker fleet:
Use a different group when you want an independent cursor:
Use no group when you want a viewer that never advances offsets:
Use a throwaway group for one-off debugging when you want group semantics without affecting a real worker group:
That lets you inspect with a separate durable cursor while production groups keep their own positions.
ACK
Current ack behavior:
ACKwrites tosystem.topic_offsets- acknowledgements are monotonic, so a lower offset does not move a group backward
ACKis per topic, group, and partitionACKrequiresservice,dba, orsystemrole
If you are using SQL CONSUME ... GROUP, issue ACK after you finish processing the returned rows. If the caller does not ACK before the topic visibility timeout, the range can be delivered again.
RESET CONSUMER GROUP
Use reset when you deliberately want to move a durable group cursor. This is an admin-only operation and affects only the named topic, group, and partition.
Replay a debug group from the beginning:
Move a group so the next read starts at offset 250:
For inspection without moving or resetting any group, omit GROUP:
CLEAR TOPIC
CLEAR TOPIC keeps the topic definition but schedules a background cleanup job to delete stored messages and consumer offsets.
DROP TOPIC
DROP TOPIC removes the topic definition and route cache entry immediately, then schedules the same background cleanup job for stored messages and offsets.
Inspect topic state
Topic metadata and consumer progress are queryable:
The message log itself is internal. You inspect messages with CONSUME, not by querying a system.topic_messages table.
Practical guidance
- use
PARTITIONS 1 - use
payload = 'full'; it is the only payload mode with fully documented behavior today - use a separate source table and result table for agent flows so you do not create loops
- treat
keyanddiffas still in development, and treat routeWHEREand custom partitioning as incomplete for now