Topic Consumers & ACK
@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying agents. Keep it out of browser code.
The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.
For high-level row-processing agents with retries and failure hooks, see:
runAgent() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom row parser only for non-row payload modes or custom envelopes.
Create a worker client with the same auth options you would use for @kalamdb/client:
import { Auth } from '@kalamdb/client';
import { createConsumerClient } from '@kalamdb/consumer';
const client = createConsumerClient({
url: 'http://localhost:8080',
authProvider: async () => Auth.basic('worker', 'Secret123!'),
});Consumer builder API
Copy this when you want a long-running worker loop.
const handle = client.consumer({
topic: 'orders',
group_id: 'billing',
auto_ack: true,
batch_size: 10,
});
await handle.run(async (ctx) => {
console.log(
ctx.message.partition_id,
ctx.message.offset,
ctx.message.op,
ctx.message.user,
ctx.message.payload,
);
});
handle.stop();Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
Manual ACK mode
Use manual ACK when the message should be committed only after your handler finishes.
const handle = client.consumer({
topic: 'orders',
group_id: 'billing-manual',
auto_ack: false,
});
await handle.run(async (ctx) => {
await processOrder(ctx.message.payload);
await ctx.ack();
});ctx contains:
user(typed branded user identifier)message(topic, group, partition, offset, key, op, timestamp, decoded payload)ack(): Promise<void>
Topic payload contract in handlers
ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.
Most handlers can work with the payload directly:
- direct row object for current
fullpayloads - optional legacy wrapper object with row under
payload.rowif you are still consuming older message shapes
For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:
function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null {
if (!payload || typeof payload !== 'object') return null;
const record = payload as Record<string, unknown>;
const source = (record.row && typeof record.row === 'object'
? record.row
: record) as Record<string, unknown>;
const conversationId = Number(source.conversation_id);
const role = source.role;
const content = source.content;
if (!Number.isInteger(conversationId)) return null;
if (role !== 'user' && role !== 'assistant') return null;
if (typeof content !== 'string') return null;
return { conversation_id: conversationId, role, content };
}If your topic source uses another payload mode (key / diff), align parser logic with the selected topic payload shape. For current backend behavior, treat diff as experimental and remember that _table lives inside the decoded payload, not in a separate source_table field.
One-shot batch consume
const batch = await client.consumeBatch({
topic: 'orders',
group_id: 'billing',
batch_size: 20,
start: 'earliest',
});
console.log(batch.messages, batch.next_offset, batch.has_more);If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:
type OrderEventPayload = {
order_id: string;
status: string;
amount: number;
_table: string;
};
const batch = await client.consumeBatch<OrderEventPayload>({
topic: 'orders',
group_id: 'billing',
batch_size: 20,
start: 'earliest',
});
for (const message of batch.messages) {
console.log(message.payload.status);
}
const handle = client.consumer<OrderEventPayload>({
topic: 'orders',
group_id: 'billing',
});
await handle.run(async (ctx) => {
console.log(ctx.message.payload.status);
});Each message includes:
topic,group_id,partition_id,offset- optional
key - optional
user - optional
op - optional
timestamp_ms - decoded
payload
message.value still exists as a deprecated alias for message.payload.
Low-level ack API
await client.ack('orders', 'billing', 0, 42);This acknowledges up to and including offset 42.
ConsumeRequest options
{
topic: string;
group_id: string;
start?: string;
partition_id?: number;
batch_size?: number;
timeout_seconds?: number;
auto_ack?: boolean;
concurrency_per_partition?: number;
}start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.
Current backend notes:
- the HTTP topic API reads one partition per request
partition_id: 0is the safe default todaytimeout_secondsis accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly
USER table write pattern in workers
When consuming events and writing into TYPE = 'USER' tables, write as the originating user:
await client.executeAsUser(
'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',
String(ctx.user ?? ''),
[conversationId, 'assistant', reply],
);This preserves per-user isolation in service-side writes.
When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer() from
@kalamdb/consumer: explicit queue or partition processing with ack semantics. - runAgent()/runConsumer() from
@kalamdb/consumer: higher-level runtime with retry andonFailedlifecycle hooks.