Topic Consumers & ACK
@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying workers. Keep it out of browser code.
The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), role-matrix executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.
For high-level row-processing workers with retries and failure hooks, see:
runConsumer() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom changeParser only for non-row payload modes or custom envelopes.
Create a worker client with the same auth options you would use for @kalamdb/client:
The consumer client now exposes the same connection lifecycle hooks as the base app client:
onConnect(callback)for the first healthy connection and later recoveriesonConnectionError(callback)for topic/auth/transport failuresonError(callback)as a compatibility alias
If you do not register an error callback, createConsumerClient() logs connection failures to the console by default.
onConnectionError receives a richer worker-oriented event than the base client callback:
messagerecoverableattemptbackoffMswhen a retry is scheduledcontextwhen the client knows which step failed- raw
error
Fatal configuration failures such as invalid URLs are marked recoverable: false. Unreachable servers and transient transport failures are reported as recoverable.
Consumer builder API
Copy this when you want a long-running worker loop.
Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
handle.run(handler, hooks?)accepts optional lifecycle hooks such asonBatchSuccess({ nextOffset, hasMore, messageCount })after each successful poll
The consumer client delegates base connection handling to its internal @kalamdb/client instance, so connection/auth failures during login, refresh, or topic auth normalization also flow through onConnectionError.
Manual ACK mode
Use manual ACK when the message should be committed only after your handler finishes.
ctx contains:
user(typed branded user identifier)message(topic, group, partition, offset, key, op, timestamp, decoded payload)ack(): Promise<void>
Topic payload contract in handlers
ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.
Most handlers can work with the payload directly:
- direct row object for current
fullpayloads - optional legacy wrapper object with row under
payload.rowif you are still consuming older message shapes
For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:
If your topic source uses another payload mode (key / diff), treat both as still in development. If you are testing them anyway, align parser logic with the current payload shape and remember that _table lives inside the decoded payload, not in a separate source_table field.
One-shot batch consume
If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:
Each message includes:
topic,group_id,partition_id,offset- optional
key - optional
user - optional
op - optional
timestamp_ms - decoded
payload
message.value still exists as a deprecated alias for message.payload.
Low-level ack API
This acknowledges up to and including offset 42.
ConsumeRequest options
start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.
Current backend notes:
- the HTTP topic API reads one partition per request
partition_id: 0is the safe default todaytimeout_secondsis accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly
USER and STREAM table write pattern in workers
When consuming events and writing into TYPE = 'USER' or TYPE = 'STREAM' tables, use executeAsUser() from an authorized service, DBA, or system account:
This preserves per-user isolation in service-side writes through KalamDB’s explicit delegation boundary.
For the table-type and impersonation rules behind this pattern, see /docs/server/architecture/table-types and /docs/server/sql-reference/impersonation.
When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer() from
@kalamdb/consumer: explicit queue or partition processing with ack semantics. - runConsumer() from
@kalamdb/consumer: higher-level runtime with retry,onConnect,onConnectionError, andonFailedlifecycle hooks. It owns ack timing internally, so there is noauto_ackoption onRunConsumerOptions.