Skip to Content
Topic Pub/SubHTTP Topic API

HTTP Topic API

Use the HTTP topic endpoints when you want explicit consume and ack control from a worker runtime or service.

The endpoints live under /v1/api/topics and currently expose:

  • POST /consume
  • POST /ack

Auth and roles

Both endpoints require:

  • Authorization: Bearer <JWT_TOKEN>
  • role in {service, dba, system}

user role is rejected.

POST /v1/api/topics/consume

Request:

json snippetJSON
{  "topic_id": "app.user_events",  "group_id": "ai-worker",  "start": "Earliest",  "limit": 100,  "partition_id": 0,  "timeout_seconds": 10}

start accepts:

  • "Latest" (default)
  • "Earliest"
  • { "Offset": 123 }

Current consume behavior:

  • one request reads one topic_id + one partition_id
  • omit group_id for stateless inspection; the request honors start on every call and does not create group claims or offsets
  • include group_id for durable consumer-group reads
  • if the group already has a committed offset for that partition, the handler resumes from last_acked_offset + 1 and ignores start
  • if the group has no committed offset yet, start decides where reading begins
  • the HTTP handler does not auto-ack the returned batch
  • the request model accepts timeout_seconds, but the current handler does not wait on that field yet; clients should poll explicitly
  • use partition_id: 0 unless you are deliberately experimenting with multi-partition topics

SQL CONSUME ... GROUP follows the same resume rule for existing groups. Both SQL and HTTP grouped consumers require explicit ACK to persist progress.

Response:

json snippetJSON
{  "messages": [    {      "topic_id": "app.user_events",      "partition_id": 0,      "offset": 10,      "payload": "<base64>",      "key": "optional-key",      "timestamp_ms": 1730000000000,      "user": "user_123",      "op": "Insert"    }  ],  "next_offset": 11,  "has_more": false}

Response notes:

  • payload is base64 because the stored message body is binary in the API contract
  • op is Insert, Update, or Delete
  • has_more is true when the batch size hit the requested limit

POST /v1/api/topics/ack

Request:

json snippetJSON
{  "topic_id": "app.user_events",  "group_id": "ai-worker",  "partition_id": 0,  "upto_offset": 10}

This commits offset 10 inclusively. The next HTTP consume for the same topic, group, and partition resumes from 11.

Response:

json snippetJSON
{  "success": true,  "acknowledged_offset": 10}

Current ack behavior:

  • ACK is persisted in system.topic_offsets
  • acknowledgements are monotonic, so lower offsets do not move a group backward
  • acknowledgements are tracked per topic, group, and partition

Claim and redelivery behavior

When HTTP consume returns messages for a group, KalamDB creates an in-memory claim for that (topic, group, partition) range.

  • another consumer in the same group and partition does not get that same range immediately
  • if the batch is not acknowledged before the visibility timeout expires, the claim can expire and the range can be re-delivered
  • the default visibility timeout is 60 seconds and comes from topics.visibility_timeout_secs

Configure it in server.toml:

toml snippetTOML
[topics]visibility_timeout_secs = 60

You can override it at startup with KALAMDB_TOPIC_VISIBILITY_TIMEOUT_SECS. KALAMDB_VISIBILITY_TIMEOUT_SECS is also accepted as a legacy alias.

Treat topic consumption as at-least-once delivery and keep handlers idempotent.

Current limitations

  • single-partition topics are the safe default today
  • there is no built-in consume every partition request
  • timeout_seconds is not active long-polling yet
  • for topic setup and inspection, use SQL Topics & Consumers

Error Shape

json snippetJSON
{  "error": "...",  "code": "FORBIDDEN|NOT_FOUND|INTERNAL_ERROR"}

Use SQL-native topic flows in SQL Topics & Consumers.

Last updated on