Skip to Content
Topic Pub/SubHTTP Topic API

HTTP Topic API

Use the HTTP topic endpoints when you want explicit consume and ack control from a worker, agent runtime, or service.

The endpoints live under /v1/api/topics and currently expose:

  • POST /consume
  • POST /ack

Auth and roles

Both endpoints require:

  • Authorization: Bearer <JWT_TOKEN>
  • role in {service, dba, system}

user role is rejected.

POST /v1/api/topics/consume

Request:

{ "topic_id": "app.user_events", "group_id": "ai-worker", "start": "Earliest", "limit": 100, "partition_id": 0, "timeout_seconds": 10 }

start accepts:

  • "Latest" (default)
  • "Earliest"
  • { "Offset": 123 }

Current consume behavior:

  • one request reads one topic_id + one group_id + one partition_id
  • if the group already has a committed offset for that partition, the handler resumes from last_acked_offset + 1 and ignores start
  • if the group has no committed offset yet, start decides where reading begins
  • the HTTP handler does not auto-ack the returned batch
  • the request model accepts timeout_seconds, but the current handler does not wait on that field yet; clients should poll explicitly
  • use partition_id: 0 unless you are deliberately experimenting with multi-partition topics

SQL CONSUME ... GROUP now follows the same resume rule for existing groups, but SQL auto-acks the last returned offset in each batch. Use HTTP when you want to control ack timing yourself.

Response:

{ "messages": [ { "topic_id": "app.user_events", "partition_id": 0, "offset": 10, "payload": "<base64>", "key": "optional-key", "timestamp_ms": 1730000000000, "user": "user_123", "op": "Insert" } ], "next_offset": 11, "has_more": false }

Response notes:

  • payload is base64 because the stored message body is binary in the API contract
  • op is Insert, Update, or Delete
  • has_more is true when the batch size hit the requested limit

POST /v1/api/topics/ack

Request:

{ "topic_id": "app.user_events", "group_id": "ai-worker", "partition_id": 0, "upto_offset": 10 }

This commits offset 10 inclusively. The next HTTP consume for the same topic, group, and partition resumes from 11.

Response:

{ "success": true, "acknowledged_offset": 10 }

Current ack behavior:

  • ACK is persisted in system.topic_offsets
  • acknowledgements are monotonic, so lower offsets do not move a group backward
  • acknowledgements are tracked per topic, group, and partition

Claim and redelivery behavior

When HTTP consume returns messages for a group, KalamDB creates an in-memory claim for that (topic, group, partition) range.

  • another consumer in the same group and partition does not get that same range immediately
  • if the batch is not acknowledged before the visibility timeout expires, the claim can expire and the range can be re-delivered
  • the default visibility timeout is 60 seconds and comes from topics.visibility_timeout_secs

Configure it in server.toml:

[topics] visibility_timeout_secs = 60

You can override it at startup with KALAMDB_TOPIC_VISIBILITY_TIMEOUT_SECS. KALAMDB_VISIBILITY_TIMEOUT_SECS is also accepted as a legacy alias.

Treat topic consumption as at-least-once delivery and keep handlers idempotent.

Current limitations

  • single-partition topics are the safe default today
  • there is no built-in consume every partition request
  • timeout_seconds is not active long-polling yet
  • for topic setup and inspection, use SQL Topics & Consumers

Error Shape

{ "error": "...", "code": "FORBIDDEN|NOT_FOUND|INTERNAL_ERROR" }

Use SQL-native topic flows in SQL Topics & Consumers.

Last updated on