SQL Topics & Consumer Groups
Use SQL to create topics, attach source tables, inspect offsets, and do simple topic reads inside KalamDB.
For production workers, the HTTP or SDK consume APIs are still a better fit when you need explicit ack timing and partition selection. The SQL surface is best for setup, inspection, debugging, and simple single-partition flows. SQL consumer groups now resume from persisted offsets too, but SQL CONSUME ... GROUP auto-acks the last returned offset in each batch.
Why use groups
Use GROUP when a worker should keep its place in a topic.
Without GROUP, a consume is stateless. Every call starts from the FROM position you requested, so it is good for inspection and replay but not for a durable worker cursor.
With GROUP, KalamDB stores progress in system.topic_offsets for (topic_id, group_id, partition_id). That gives you:
- resume-after-restart behavior for the same logical worker or worker fleet
- independent read positions for different consumers on the same topic
- a way to replay with one group while production workers keep their own offsets
There is no separate CREATE GROUP command. A group is created the first time you consume or acknowledge offsets with a new group_id.
Stateless reads vs durable reads
Use no group when you want a pure viewer:
CONSUME FROM demo.message_events FROM EARLIEST LIMIT 100;That query does not store progress. Run it again and you can replay the same range again.
Use a group when you want a named cursor:
CONSUME FROM demo.message_events
GROUP 'ui-debug-admin'
FROM LATEST
LIMIT 100;That query creates or resumes the ui-debug-admin cursor for partition 0.
Recommended starting point
Use a single-partition topic until multi-partition behavior is fully finished:
CREATE TOPIC app.user_events PARTITIONS 1;CREATE TOPIC requirements today:
- topic names must be namespace-qualified, like
app.user_events - the namespace must already exist
CREATE TOPIC IF NOT EXISTSis not currently supported- topic creation requires
dbaorsystemrole
CREATE TOPIC
CREATE TOPIC <topic_name>;
CREATE TOPIC <topic_name> PARTITIONS <count>;When a topic is created, KalamDB persists it in system.topics and adds it to the in-memory route cache immediately.
ADD SOURCE
ALTER TOPIC <topic_name>
ADD SOURCE <table_name_or_namespace.table_name>
ON <INSERT|UPDATE|DELETE>
[WHERE <filter_expression>]
[WITH (payload = '<key|full|diff>')];Current route behavior:
- a route matches by source table and operation
- one changed row becomes one topic message
- route registration updates
system.topicsand the in-memory route cache immediately ALTER TOPIC ... ADD SOURCErequiresdbaorsystemrole
You can add separate routes for INSERT, UPDATE, and DELETE:
ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WITH (payload = 'full');
ALTER TOPIC app.user_events ADD SOURCE app.users ON UPDATE WITH (payload = 'full');
ALTER TOPIC app.user_events ADD SOURCE app.users ON DELETE WITH (payload = 'full');Payload modes
The parser accepts key, full, and diff, but only full has behavior you should rely on today:
ALTER TOPIC app.user_events
ADD SOURCE app.users
ON INSERT
WITH (payload = 'full');What those modes mean in the current code:
| Mode | Current behavior |
|---|---|
key | Publishes the row JSON without _table metadata. It is not limited to primary-key columns yet. |
full | Publishes the row JSON and injects _table with the source table id. |
diff | Accepted by the parser, but currently published the same way as full. Treat it as experimental. |
If you want predictable payloads today, use payload = 'full'. Treat key and diff as incomplete or experimental.
WHERE filters
This syntax is accepted:
ALTER TOPIC app.user_events
ADD SOURCE app.users
ON INSERT
WHERE is_active = true
WITH (payload = 'full');The filter expression is stored in topic metadata, but the current publisher path does not evaluate it yet. Do not rely on WHERE to suppress messages today.
CONSUME FROM
CONSUME FROM <topic_name>
[GROUP '<group_id>']
[FROM <LATEST|EARLIEST|offset>]
[LIMIT <count>];Examples:
CONSUME FROM app.user_events FROM EARLIEST LIMIT 10;
CONSUME FROM app.user_events
GROUP 'ai-worker'
FROM EARLIEST
LIMIT 10;Current SQL consume behavior:
- SQL
CONSUMEreads partition0only - the result columns are
topic_id,partition_id,offset,key,payload,timestamp_ms,user_id, andop payloadis a binary column containing the stored message bytesuser_idis the canonical producing user id when the message came from a USER-table scoped write- without
GROUP, the read is stateless - with
GROUP, KalamDB first checkssystem.topic_offsetsand resumes from the committed offset when one exists - if a group has no committed offset yet, the
FROMclause decides the initial position - with
GROUP, the SQL handler auto-commits the last returned offset for that batch on partition0
If you need explicit ack timing or partition selection, use the HTTP Topic API.
New group vs existing group
The important rule is:
- for a brand-new group,
FROMdecides where the first read starts - for a group that already has a committed offset, KalamDB resumes from that offset and ignores
FROM
Examples:
New group that tails only future messages:
CONSUME FROM demo.message_events
GROUP 'ui-debug-admin'
FROM LATEST
LIMIT 100;Use that when you create a fresh debug group and only want messages written after the group starts.
New group that replays the backlog:
CONSUME FROM demo.message_events
GROUP 'backfill-worker'
FROM EARLIEST
LIMIT 100;Use that when a new worker should process existing messages from the beginning.
Existing group that already consumed part of the topic:
CONSUME FROM demo.message_events
GROUP 'backfill-worker'
FROM LATEST
LIMIT 100;Even though this query says FROM LATEST, KalamDB resumes backfill-worker from its committed offset. The group already has a cursor, so the FROM clause no longer changes the starting point.
If you created a new group on the fly and expected to see existing messages, do not use FROM LATEST. Use FROM EARLIEST or FROM <offset> for that first consume.
How to use groups safely
Use the same group id for one logical worker fleet:
CONSUME FROM app.user_events GROUP 'email-worker' FROM EARLIEST LIMIT 50;Use a different group when you want an independent cursor:
CONSUME FROM app.user_events GROUP 'audit-replay' FROM EARLIEST LIMIT 50;Use no group when you want a viewer that never advances offsets:
CONSUME FROM app.user_events FROM EARLIEST LIMIT 50;Use a throwaway group for one-off debugging when you want group semantics without affecting a real worker group:
CONSUME FROM app.user_events
GROUP 'debug-2026-04-24-admin'
FROM EARLIEST
LIMIT 50;That lets you inspect with a separate durable cursor while production groups keep their own positions.
ACK
ACK <topic_name>
GROUP '<group_id>'
[PARTITION <partition_id>]
UPTO OFFSET <offset>;Current ack behavior:
ACKwrites tosystem.topic_offsets- acknowledgements are monotonic, so a lower offset does not move a group backward
ACKis per topic, group, and partitionACKrequiresservice,dba, orsystemrole
If you are using SQL CONSUME ... GROUP, the current handler already auto-acks the last returned offset for the returned batch. Manual ACK is mainly useful alongside HTTP or SDK consumers, or when you want to move a group forward explicitly.
CLEAR TOPIC
CLEAR TOPIC <topic_name>;CLEAR TOPIC keeps the topic definition but schedules a background cleanup job to delete stored messages and consumer offsets.
DROP TOPIC
DROP TOPIC <topic_name>;DROP TOPIC removes the topic definition and route cache entry immediately, then schedules the same background cleanup job for stored messages and offsets.
Inspect topic state
Topic metadata and consumer progress are queryable:
SELECT topic_id, partitions, routes, created_at, updated_at
FROM system.topics
ORDER BY topic_id;
SELECT topic_id, group_id, partition_id, last_acked_offset, updated_at
FROM system.topic_offsets
ORDER BY topic_id, group_id, partition_id;The message log itself is internal. You inspect messages with CONSUME, not by querying a system.topic_messages table.
Practical guidance
- use
PARTITIONS 1 - use
payload = 'full'; it is the only payload mode with fully documented behavior today - use a separate source table and result table for agent flows so you do not create loops
- treat
key,diff, routeWHERE, and custom partitioning as incomplete or experimental for now