Topic Consumers & ACK
@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying agents. Keep it out of browser code.
The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), role-matrix executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.
For high-level row-processing agents with retries and failure hooks, see:
runConsumer() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom changeParser only for non-row payload modes or custom envelopes.
Create a worker client with the same auth options you would use for @kalamdb/client:
Consumer builder API
Copy this when you want a long-running worker loop.
Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
Manual ACK mode
Use manual ACK when the message should be committed only after your handler finishes.
ctx contains:
user(typed branded user identifier)message(topic, group, partition, offset, key, op, timestamp, decoded payload)ack(): Promise<void>
Topic payload contract in handlers
ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.
Most handlers can work with the payload directly:
- direct row object for current
fullpayloads - optional legacy wrapper object with row under
payload.rowif you are still consuming older message shapes
For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:
If your topic source uses another payload mode (key / diff), treat both as still in development. If you are testing them anyway, align parser logic with the current payload shape and remember that _table lives inside the decoded payload, not in a separate source_table field.
One-shot batch consume
If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:
Each message includes:
topic,group_id,partition_id,offset- optional
key - optional
user - optional
op - optional
timestamp_ms - decoded
payload
message.value still exists as a deprecated alias for message.payload.
Low-level ack API
This acknowledges up to and including offset 42.
ConsumeRequest options
start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.
Current backend notes:
- the HTTP topic API reads one partition per request
partition_id: 0is the safe default todaytimeout_secondsis accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly
USER and STREAM table write pattern in workers
When consuming events and writing into TYPE = 'USER' or TYPE = 'STREAM' tables, use executeAsUser() from an authorized service, DBA, or system account:
This preserves per-user isolation in service-side writes through KalamDB’s explicit delegation boundary.
When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer() from
@kalamdb/consumer: explicit queue or partition processing with ack semantics. - runConsumer() from
@kalamdb/consumer: higher-level runtime with retry andonFailedlifecycle hooks.