Topic Consumers & ACK
@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying workers. Keep it out of browser code.
The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), role-matrix executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.
For high-level row-processing workers with retries and failure hooks, see:
runConsumer() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom changeParser only for non-row payload modes or custom envelopes.
Create a worker client with the same auth options you would use for @kalamdb/client:
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({ url: 'http://localhost:2900', authProvider: async () => Auth.basic('worker', 'Secret123!'),});Consumer builder API
Copy this when you want a long-running worker loop.
const handle = client.consumer({ topic: 'orders', group_id: 'billing', auto_ack: true, batch_size: 10,}); await handle.run(async (ctx) => { console.log( ctx.message.partition_id, ctx.message.offset, ctx.message.op, ctx.message.user, ctx.message.payload, );}); handle.stop();Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
handle.run(handler, hooks?)accepts optional lifecycle hooks such asonBatchSuccess({ nextOffset, hasMore, messageCount })after each successful poll
Manual ACK mode
Use manual ACK when the message should be committed only after your handler finishes.
const handle = client.consumer({ topic: 'orders', group_id: 'billing-manual', auto_ack: false,}); await handle.run(async (ctx) => { await processOrder(ctx.message.payload); await ctx.ack();});ctx contains:
user(typed branded user identifier)message(topic, group, partition, offset, key, op, timestamp, decoded payload)ack(): Promise<void>
Topic payload contract in handlers
ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.
Most handlers can work with the payload directly:
- direct row object for current
fullpayloads - optional legacy wrapper object with row under
payload.rowif you are still consuming older message shapes
For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:
function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null { if (!payload || typeof payload !== 'object') return null; const record = payload as Record<string, unknown>; const source = (record.row && typeof record.row === 'object' ? record.row : record) as Record<string, unknown>; const conversationId = Number(source.conversation_id); const role = source.role; const content = source.content; if (!Number.isInteger(conversationId)) return null; if (role !== 'user' && role !== 'assistant') return null; if (typeof content !== 'string') return null; return { conversation_id: conversationId, role, content };}If your topic source uses another payload mode (key / diff), treat both as still in development. If you are testing them anyway, align parser logic with the current payload shape and remember that _table lives inside the decoded payload, not in a separate source_table field.
One-shot batch consume
const batch = await client.consumeBatch({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest',}); console.log(batch.messages, batch.next_offset, batch.has_more);If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:
type OrderEventPayload = { order_id: string; status: string; amount: number; _table: string;}; const batch = await client.consumeBatch<OrderEventPayload>({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest',}); for (const message of batch.messages) { console.log(message.payload.status);} const handle = client.consumer<OrderEventPayload>({ topic: 'orders', group_id: 'billing',}); await handle.run(async (ctx) => { console.log(ctx.message.payload.status);});Each message includes:
topic,group_id,partition_id,offset- optional
key - optional
user - optional
op - optional
timestamp_ms - decoded
payload
message.value still exists as a deprecated alias for message.payload.
Low-level ack API
await client.ack('orders', 'billing', 0, 42);This acknowledges up to and including offset 42.
ConsumeRequest options
{ topic: string; group_id: string; start?: string; partition_id?: number; batch_size?: number; timeout_seconds?: number; auto_ack?: boolean; concurrency_per_partition?: number;}start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.
Current backend notes:
- the HTTP topic API reads one partition per request
partition_id: 0is the safe default todaytimeout_secondsis accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly
USER and STREAM table write pattern in workers
When consuming events and writing into TYPE = 'USER' or TYPE = 'STREAM' tables, use executeAsUser() from an authorized service, DBA, or system account:
await client.executeAsUser( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', String(ctx.user ?? ''), [conversationId, 'assistant', reply],);This preserves per-user isolation in service-side writes through KalamDB’s explicit delegation boundary.
When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer() from
@kalamdb/consumer: explicit queue or partition processing with ack semantics. - runConsumer() from
@kalamdb/consumer: higher-level runtime with retry andonFailedlifecycle hooks. It owns ack timing internally, so there is noauto_ackoption onRunConsumerOptions.