HTTP Topic API
Use the HTTP topic endpoints when you want explicit consume and ack control from a worker runtime or service.
The endpoints live under /v1/api/topics and currently expose:
POST /consumePOST /ack
Auth and roles
Both endpoints require:
Authorization: Bearer <JWT_TOKEN>- role in
{service, dba, system}
user role is rejected.
POST /v1/api/topics/consume
Request:
{ "topic_id": "app.user_events", "group_id": "ai-worker", "start": "Earliest", "limit": 100, "partition_id": 0, "timeout_seconds": 10}start accepts:
"Latest"(default)"Earliest"{ "Offset": 123 }
Current consume behavior:
- one request reads one
topic_id+ onepartition_id - omit
group_idfor stateless inspection; the request honorsstarton every call and does not create group claims or offsets - include
group_idfor durable consumer-group reads - if the group already has a committed offset for that partition, the handler resumes from
last_acked_offset + 1and ignoresstart - if the group has no committed offset yet,
startdecides where reading begins - the HTTP handler does not auto-ack the returned batch
- the request model accepts
timeout_seconds, but the current handler does not wait on that field yet; clients should poll explicitly - use
partition_id: 0unless you are deliberately experimenting with multi-partition topics
SQL CONSUME ... GROUP follows the same resume rule for existing groups. Both SQL and HTTP grouped consumers require explicit ACK to persist progress.
Response:
{ "messages": [ { "topic_id": "app.user_events", "partition_id": 0, "offset": 10, "payload": "<base64>", "key": "optional-key", "timestamp_ms": 1730000000000, "user": "user_123", "op": "Insert" } ], "next_offset": 11, "has_more": false}Response notes:
payloadis base64 because the stored message body is binary in the API contractopisInsert,Update, orDeletehas_moreistruewhen the batch size hit the requestedlimit
POST /v1/api/topics/ack
Request:
{ "topic_id": "app.user_events", "group_id": "ai-worker", "partition_id": 0, "upto_offset": 10}This commits offset 10 inclusively. The next HTTP consume for the same topic, group, and partition resumes from 11.
Response:
{ "success": true, "acknowledged_offset": 10}Current ack behavior:
ACKis persisted insystem.topic_offsets- acknowledgements are monotonic, so lower offsets do not move a group backward
- acknowledgements are tracked per topic, group, and partition
Claim and redelivery behavior
When HTTP consume returns messages for a group, KalamDB creates an in-memory claim for that (topic, group, partition) range.
- another consumer in the same group and partition does not get that same range immediately
- if the batch is not acknowledged before the visibility timeout expires, the claim can expire and the range can be re-delivered
- the default visibility timeout is
60seconds and comes fromtopics.visibility_timeout_secs
Configure it in server.toml:
[topics]visibility_timeout_secs = 60You can override it at startup with KALAMDB_TOPIC_VISIBILITY_TIMEOUT_SECS. KALAMDB_VISIBILITY_TIMEOUT_SECS is also accepted as a legacy alias.
Treat topic consumption as at-least-once delivery and keep handlers idempotent.
Current limitations
- single-partition topics are the safe default today
- there is no built-in
consume every partitionrequest timeout_secondsis not active long-polling yet- for topic setup and inspection, use SQL Topics & Consumers
Error Shape
{ "error": "...", "code": "FORBIDDEN|NOT_FOUND|INTERNAL_ERROR"}Use SQL-native topic flows in SQL Topics & Consumers.