Skip to Content
Topic Pub/SubSQL Topics & Consumers

SQL Topics & Consumer Groups

Use SQL to create topics, attach source tables, inspect offsets, and do simple topic reads inside KalamDB.

For production workers, the HTTP or SDK consume APIs are still a better fit when you need explicit ack timing and partition selection. The SQL surface is best for setup, inspection, debugging, and simple single-partition flows. SQL consumer groups now resume from persisted offsets too, but SQL CONSUME ... GROUP auto-acks the last returned offset in each batch.

Why use groups

Use GROUP when a worker should keep its place in a topic.

Without GROUP, a consume is stateless. Every call starts from the FROM position you requested, so it is good for inspection and replay but not for a durable worker cursor.

With GROUP, KalamDB stores progress in system.topic_offsets for (topic_id, group_id, partition_id). That gives you:

  • resume-after-restart behavior for the same logical worker or worker fleet
  • independent read positions for different consumers on the same topic
  • a way to replay with one group while production workers keep their own offsets

There is no separate CREATE GROUP command. A group is created the first time you consume or acknowledge offsets with a new group_id.

Stateless reads vs durable reads

Use no group when you want a pure viewer:

CONSUME FROM demo.message_events FROM EARLIEST LIMIT 100;

That query does not store progress. Run it again and you can replay the same range again.

Use a group when you want a named cursor:

CONSUME FROM demo.message_events GROUP 'ui-debug-admin' FROM LATEST LIMIT 100;

That query creates or resumes the ui-debug-admin cursor for partition 0.

Use a single-partition topic until multi-partition behavior is fully finished:

CREATE TOPIC app.user_events PARTITIONS 1;

CREATE TOPIC requirements today:

  • topic names must be namespace-qualified, like app.user_events
  • the namespace must already exist
  • CREATE TOPIC IF NOT EXISTS is not currently supported
  • topic creation requires dba or system role

CREATE TOPIC

CREATE TOPIC <topic_name>; CREATE TOPIC <topic_name> PARTITIONS <count>;

When a topic is created, KalamDB persists it in system.topics and adds it to the in-memory route cache immediately.

ADD SOURCE

ALTER TOPIC <topic_name> ADD SOURCE <table_name_or_namespace.table_name> ON <INSERT|UPDATE|DELETE> [WHERE <filter_expression>] [WITH (payload = '<key|full|diff>')];

Current route behavior:

  • a route matches by source table and operation
  • one changed row becomes one topic message
  • route registration updates system.topics and the in-memory route cache immediately
  • ALTER TOPIC ... ADD SOURCE requires dba or system role

You can add separate routes for INSERT, UPDATE, and DELETE:

ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WITH (payload = 'full'); ALTER TOPIC app.user_events ADD SOURCE app.users ON UPDATE WITH (payload = 'full'); ALTER TOPIC app.user_events ADD SOURCE app.users ON DELETE WITH (payload = 'full');

Payload modes

The parser accepts key, full, and diff, but only full has behavior you should rely on today:

ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WITH (payload = 'full');

What those modes mean in the current code:

ModeCurrent behavior
keyPublishes the row JSON without _table metadata. It is not limited to primary-key columns yet.
fullPublishes the row JSON and injects _table with the source table id.
diffAccepted by the parser, but currently published the same way as full. Treat it as experimental.

If you want predictable payloads today, use payload = 'full'. Treat key and diff as incomplete or experimental.

WHERE filters

This syntax is accepted:

ALTER TOPIC app.user_events ADD SOURCE app.users ON INSERT WHERE is_active = true WITH (payload = 'full');

The filter expression is stored in topic metadata, but the current publisher path does not evaluate it yet. Do not rely on WHERE to suppress messages today.

CONSUME FROM

CONSUME FROM <topic_name> [GROUP '<group_id>'] [FROM <LATEST|EARLIEST|offset>] [LIMIT <count>];

Examples:

CONSUME FROM app.user_events FROM EARLIEST LIMIT 10; CONSUME FROM app.user_events GROUP 'ai-worker' FROM EARLIEST LIMIT 10;

Current SQL consume behavior:

  • SQL CONSUME reads partition 0 only
  • the result columns are topic_id, partition_id, offset, key, payload, timestamp_ms, user_id, and op
  • payload is a binary column containing the stored message bytes
  • user_id is the canonical producing user id when the message came from a USER-table scoped write
  • without GROUP, the read is stateless
  • with GROUP, KalamDB first checks system.topic_offsets and resumes from the committed offset when one exists
  • if a group has no committed offset yet, the FROM clause decides the initial position
  • with GROUP, the SQL handler auto-commits the last returned offset for that batch on partition 0

If you need explicit ack timing or partition selection, use the HTTP Topic API.

New group vs existing group

The important rule is:

  • for a brand-new group, FROM decides where the first read starts
  • for a group that already has a committed offset, KalamDB resumes from that offset and ignores FROM

Examples:

New group that tails only future messages:

CONSUME FROM demo.message_events GROUP 'ui-debug-admin' FROM LATEST LIMIT 100;

Use that when you create a fresh debug group and only want messages written after the group starts.

New group that replays the backlog:

CONSUME FROM demo.message_events GROUP 'backfill-worker' FROM EARLIEST LIMIT 100;

Use that when a new worker should process existing messages from the beginning.

Existing group that already consumed part of the topic:

CONSUME FROM demo.message_events GROUP 'backfill-worker' FROM LATEST LIMIT 100;

Even though this query says FROM LATEST, KalamDB resumes backfill-worker from its committed offset. The group already has a cursor, so the FROM clause no longer changes the starting point.

If you created a new group on the fly and expected to see existing messages, do not use FROM LATEST. Use FROM EARLIEST or FROM <offset> for that first consume.

How to use groups safely

Use the same group id for one logical worker fleet:

CONSUME FROM app.user_events GROUP 'email-worker' FROM EARLIEST LIMIT 50;

Use a different group when you want an independent cursor:

CONSUME FROM app.user_events GROUP 'audit-replay' FROM EARLIEST LIMIT 50;

Use no group when you want a viewer that never advances offsets:

CONSUME FROM app.user_events FROM EARLIEST LIMIT 50;

Use a throwaway group for one-off debugging when you want group semantics without affecting a real worker group:

CONSUME FROM app.user_events GROUP 'debug-2026-04-24-admin' FROM EARLIEST LIMIT 50;

That lets you inspect with a separate durable cursor while production groups keep their own positions.

ACK

ACK <topic_name> GROUP '<group_id>' [PARTITION <partition_id>] UPTO OFFSET <offset>;

Current ack behavior:

  • ACK writes to system.topic_offsets
  • acknowledgements are monotonic, so a lower offset does not move a group backward
  • ACK is per topic, group, and partition
  • ACK requires service, dba, or system role

If you are using SQL CONSUME ... GROUP, the current handler already auto-acks the last returned offset for the returned batch. Manual ACK is mainly useful alongside HTTP or SDK consumers, or when you want to move a group forward explicitly.

CLEAR TOPIC

CLEAR TOPIC <topic_name>;

CLEAR TOPIC keeps the topic definition but schedules a background cleanup job to delete stored messages and consumer offsets.

DROP TOPIC

DROP TOPIC <topic_name>;

DROP TOPIC removes the topic definition and route cache entry immediately, then schedules the same background cleanup job for stored messages and offsets.

Inspect topic state

Topic metadata and consumer progress are queryable:

SELECT topic_id, partitions, routes, created_at, updated_at FROM system.topics ORDER BY topic_id; SELECT topic_id, group_id, partition_id, last_acked_offset, updated_at FROM system.topic_offsets ORDER BY topic_id, group_id, partition_id;

The message log itself is internal. You inspect messages with CONSUME, not by querying a system.topic_messages table.

Practical guidance

  • use PARTITIONS 1
  • use payload = 'full'; it is the only payload mode with fully documented behavior today
  • use a separate source table and result table for agent flows so you do not create loops
  • treat key, diff, route WHERE, and custom partitioning as incomplete or experimental for now
Last updated on