Skip to Content
Topic Consumers & ACK

Topic Consumers & ACK

@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying agents. Keep it out of browser code.

The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.

For high-level row-processing agents with retries and failure hooks, see:

runAgent() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom row parser only for non-row payload modes or custom envelopes.

Create a worker client with the same auth options you would use for @kalamdb/client:

import { Auth } from '@kalamdb/client'; import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({ url: 'http://localhost:8080', authProvider: async () => Auth.basic('worker', 'Secret123!'), });

Consumer builder API

Copy this when you want a long-running worker loop.

const handle = client.consumer({ topic: 'orders', group_id: 'billing', auto_ack: true, batch_size: 10, }); await handle.run(async (ctx) => { console.log( ctx.message.partition_id, ctx.message.offset, ctx.message.op, ctx.message.user, ctx.message.payload, ); }); handle.stop();

Source behavior (client.ts):

  • loops consume(options) until stop() requested
  • if auto_ack and handler didn’t manually ctx.ack(), SDK auto-acks
  • when no more messages, sleeps ~1s before polling again

Manual ACK mode

Use manual ACK when the message should be committed only after your handler finishes.

const handle = client.consumer({ topic: 'orders', group_id: 'billing-manual', auto_ack: false, }); await handle.run(async (ctx) => { await processOrder(ctx.message.payload); await ctx.ack(); });

ctx contains:

  • user (typed branded user identifier)
  • message (topic, group, partition, offset, key, op, timestamp, decoded payload)
  • ack(): Promise<void>

Topic payload contract in handlers

ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.

Most handlers can work with the payload directly:

  • direct row object for current full payloads
  • optional legacy wrapper object with row under payload.row if you are still consuming older message shapes

For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:

function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null { if (!payload || typeof payload !== 'object') return null; const record = payload as Record<string, unknown>; const source = (record.row && typeof record.row === 'object' ? record.row : record) as Record<string, unknown>; const conversationId = Number(source.conversation_id); const role = source.role; const content = source.content; if (!Number.isInteger(conversationId)) return null; if (role !== 'user' && role !== 'assistant') return null; if (typeof content !== 'string') return null; return { conversation_id: conversationId, role, content }; }

If your topic source uses another payload mode (key / diff), align parser logic with the selected topic payload shape. For current backend behavior, treat diff as experimental and remember that _table lives inside the decoded payload, not in a separate source_table field.

One-shot batch consume

const batch = await client.consumeBatch({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest', }); console.log(batch.messages, batch.next_offset, batch.has_more);

If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:

type OrderEventPayload = { order_id: string; status: string; amount: number; _table: string; }; const batch = await client.consumeBatch<OrderEventPayload>({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest', }); for (const message of batch.messages) { console.log(message.payload.status); } const handle = client.consumer<OrderEventPayload>({ topic: 'orders', group_id: 'billing', }); await handle.run(async (ctx) => { console.log(ctx.message.payload.status); });

Each message includes:

  • topic, group_id, partition_id, offset
  • optional key
  • optional user
  • optional op
  • optional timestamp_ms
  • decoded payload

message.value still exists as a deprecated alias for message.payload.

Low-level ack API

await client.ack('orders', 'billing', 0, 42);

This acknowledges up to and including offset 42.

ConsumeRequest options

{ topic: string; group_id: string; start?: string; partition_id?: number; batch_size?: number; timeout_seconds?: number; auto_ack?: boolean; concurrency_per_partition?: number; }

start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.

Current backend notes:

  • the HTTP topic API reads one partition per request
  • partition_id: 0 is the safe default today
  • timeout_seconds is accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly

USER table write pattern in workers

When consuming events and writing into TYPE = 'USER' tables, write as the originating user:

await client.executeAsUser( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', String(ctx.user ?? ''), [conversationId, 'assistant', reply], );

This preserves per-user isolation in service-side writes.

When to use which API

  • SQL subscriptions: row-level table/query reactivity.
  • consumer() from @kalamdb/consumer: explicit queue or partition processing with ack semantics.
  • runAgent()/runConsumer() from @kalamdb/consumer: higher-level runtime with retry and onFailed lifecycle hooks.
Last updated on