Getting Started with Topics
Use this page to build the smallest useful pub/sub flow in KalamDB.
We will keep it single-partition and minimal, but this version uses a USER table because that is the right starting point for per-user chat and agent workflows.
Why use pub/sub here?
Without pub/sub, an AI worker has to keep polling a table to discover new prompts.
With topics:
- your app writes a normal SQL row
- KalamDB turns that row change into a durable topic message
- the AI worker consumes the message
- the worker writes the reply back with normal SQL
That gives you a clean handoff between user-facing writes and background AI work.
Why use a USER table here?
demo.messages is a logical table name shared by all users, but each authenticated user writes into their own isolated USER-table scope.
That means:
- Alice writes her own
demo.messagesrows - Bob writes his own
demo.messagesrows - the agent can see which user produced the topic message
- the agent can write the assistant reply back into the same USER table for that same user
There is no separate tenant field in the current topic message envelope. For USER tables, the producing user is the ownership signal.
1. Create a namespace and a USER table
This table keeps both user and assistant messages together.
CREATE NAMESPACE demo;
CREATE USER TABLE demo.messages (
id TEXT PRIMARY KEY DEFAULT ULID(),
author TEXT NOT NULL,
msg TEXT NOT NULL,
attachment FILE,
created TIMESTAMP NOT NULL DEFAULT NOW()
);Notes:
ULID()gives you a sortable text IDattachment FILEis optional- the owning user does not need a separate column because USER-table scope already tracks that
2. Create a single-partition topic
CREATE TOPIC demo.message_events PARTITIONS 1;Use PARTITIONS 1 for now. Multi-partition topics still have experimental edges in the current implementation.
3. Route message writes into the topic
ALTER TOPIC demo.message_events
ADD SOURCE demo.messages
ON INSERT
WITH (payload = 'full');Now every inserted row in demo.messages becomes one topic message in demo.message_events.
If you also want edit and delete events, add optional routes:
ALTER TOPIC demo.message_events ADD SOURCE demo.messages ON UPDATE WITH (payload = 'full');
ALTER TOPIC demo.message_events ADD SOURCE demo.messages ON DELETE WITH (payload = 'full');When those routes are active, the topic envelope changes op to Update or Delete accordingly.
4. Let a user write a message
INSERT INTO demo.messages (author, msg)
VALUES ('user', 'Write a short summary of this support ticket.');If user_123 runs that insert while authenticated as user_123, the new row belongs to user_123’s USER-table scope.
That insert is the only thing the app has to do. KalamDB handles topic publication inside the write path.
5. Inspect the topic in SQL
CONSUME FROM demo.message_events
FROM EARLIEST
LIMIT 1;The result contains topic, partition, offset, key, payload, timestamp_ms, and op.
In this example, the binary payload contains the inserted row as JSON bytes plus _table: "demo.messages" because the route uses payload = 'full'.
The current SQL result columns are topic_id, partition_id, offset, key, payload, timestamp_ms, user_id, and op.
SQL CONSUME is useful for inspection, and it now includes the producing user_id when the message came from a USER-table scoped write.
6. What the agent actually needs from the topic message
The current topic message design does preserve the producing user for USER-table writes.
When the agent consumes through the HTTP topic API or the SDK consumer, the message envelope includes:
{
"topic_id": "demo.message_events",
"partition_id": 0,
"offset": 0,
"user": "user_123",
"op": "Insert"
}And the decoded payload looks like:
{
"id": "01HS7Y6W0R5J4Q1M9A1Q7X2ABC",
"author": "user",
"msg": "Write a short summary of this support ticket.",
"attachment": null,
"created": "2026-04-18T12:00:00Z",
"_table": "demo.messages"
}This is the current identity story in topic messages:
usertells the agent which USER-table scope owns the rowoptells the agent whether the event came from an insert, update, or delete route_tabletells the agent which table produced the message- there is no separate tenant field beyond that user scope today
7. Let the agent write back into the same USER table
Once the agent knows the producing user, it can write the assistant reply into that same user’s demo.messages table scope:
EXECUTE AS USER 'user_123' (
INSERT INTO demo.messages (author, msg)
VALUES ('assistant', 'Here is a short summary of the ticket...')
);That is the key reason the producing user field matters for USER-table topic consumers.
8. Read the conversation back
SELECT id, author, msg, created
FROM demo.messages
ORDER BY created;What this pattern gives you
- the app keeps using simple SQL writes
- each user keeps their own isolated
demo.messagesrows - the AI worker gets a durable queue of new work
- the worker can identify the producing user from the topic message
- the assistant reply stays in the same USER table conversation history
Important notes for this simple flow
- keep the topic at one partition
- because the agent writes back into the same table, its assistant insert will also create a topic message
- today, route
WHEREfilters are stored but not enforced in the publisher path, so do not rely onWHERE author = 'user'yet - instead, make the agent ignore messages where the decoded payload already has
author = 'assistant' - if you need the producing
user, consume through the HTTP Topic API or an SDK consumer, not SQLCONSUME - for the full command surface and current caveats, see SQL Topics & Consumers