HTTP Topic API
Use the HTTP topic endpoints when you want explicit consume and ack control from a worker, agent runtime, or service.
The endpoints live under /v1/api/topics and currently expose:
POST /consumePOST /ack
Auth and roles
Both endpoints require:
Authorization: Bearer <JWT_TOKEN>- role in
{service, dba, system}
user role is rejected.
POST /v1/api/topics/consume
Request:
{
"topic_id": "app.user_events",
"group_id": "ai-worker",
"start": "Earliest",
"limit": 100,
"partition_id": 0,
"timeout_seconds": 10
}start accepts:
"Latest"(default)"Earliest"{ "Offset": 123 }
Current consume behavior:
- one request reads one
topic_id+ onegroup_id+ onepartition_id - if the group already has a committed offset for that partition, the handler resumes from
last_acked_offset + 1and ignoresstart - if the group has no committed offset yet,
startdecides where reading begins - the HTTP handler does not auto-ack the returned batch
- the request model accepts
timeout_seconds, but the current handler does not wait on that field yet; clients should poll explicitly - use
partition_id: 0unless you are deliberately experimenting with multi-partition topics
SQL CONSUME ... GROUP now follows the same resume rule for existing groups, but SQL auto-acks the last returned offset in each batch. Use HTTP when you want to control ack timing yourself.
Response:
{
"messages": [
{
"topic_id": "app.user_events",
"partition_id": 0,
"offset": 10,
"payload": "<base64>",
"key": "optional-key",
"timestamp_ms": 1730000000000,
"user": "user_123",
"op": "Insert"
}
],
"next_offset": 11,
"has_more": false
}Response notes:
payloadis base64 because the stored message body is binary in the API contractopisInsert,Update, orDeletehas_moreistruewhen the batch size hit the requestedlimit
POST /v1/api/topics/ack
Request:
{
"topic_id": "app.user_events",
"group_id": "ai-worker",
"partition_id": 0,
"upto_offset": 10
}This commits offset 10 inclusively. The next HTTP consume for the same topic, group, and partition resumes from 11.
Response:
{
"success": true,
"acknowledged_offset": 10
}Current ack behavior:
ACKis persisted insystem.topic_offsets- acknowledgements are monotonic, so lower offsets do not move a group backward
- acknowledgements are tracked per topic, group, and partition
Claim and redelivery behavior
When HTTP consume returns messages for a group, KalamDB creates an in-memory claim for that (topic, group, partition) range.
- another consumer in the same group and partition does not get that same range immediately
- if the batch is not acknowledged before the visibility timeout expires, the claim can expire and the range can be re-delivered
- the default visibility timeout is
60seconds and comes fromtopics.visibility_timeout_secs
Configure it in server.toml:
[topics]
visibility_timeout_secs = 60You can override it at startup with KALAMDB_TOPIC_VISIBILITY_TIMEOUT_SECS. KALAMDB_VISIBILITY_TIMEOUT_SECS is also accepted as a legacy alias.
Treat topic consumption as at-least-once delivery and keep handlers idempotent.
Current limitations
- single-partition topics are the safe default today
- there is no built-in
consume every partitionrequest timeout_secondsis not active long-polling yet- for topic setup and inspection, use SQL Topics & Consumers
Error Shape
{
"error": "...",
"code": "FORBIDDEN|NOT_FOUND|INTERNAL_ERROR"
}Use SQL-native topic flows in SQL Topics & Consumers.