Skip to Content
Topic Consumers & ACK

Topic Consumers & ACK

@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying workers. Keep it out of browser code.

The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), role-matrix executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.

For high-level row-processing workers with retries and failure hooks, see:

runConsumer() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom changeParser only for non-row payload modes or custom envelopes.

Create a worker client with the same auth options you would use for @kalamdb/client:

TS
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.basic('worker', 'Secret123!'),  onConnect: () => {    console.log('worker client is ready');  },  onConnectionError: (event) => {    console.error(event.message, { recoverable: event.recoverable, attempt: event.attempt });  },});

The consumer client now exposes the same connection lifecycle hooks as the base app client:

  • onConnect(callback) for the first healthy connection and later recoveries
  • onConnectionError(callback) for topic/auth/transport failures
  • onError(callback) as a compatibility alias

If you do not register an error callback, createConsumerClient() logs connection failures to the console by default.

onConnectionError receives a richer worker-oriented event than the base client callback:

  • message
  • recoverable
  • attempt
  • backoffMs when a retry is scheduled
  • context when the client knows which step failed
  • raw error

Fatal configuration failures such as invalid URLs are marked recoverable: false. Unreachable servers and transient transport failures are reported as recoverable.

Consumer builder API

Copy this when you want a long-running worker loop.

TS
const handle = client.consumer({  topic: 'orders',  group_id: 'billing',  auto_ack: true,  batch_size: 10,}); await handle.run(async (ctx) => {  console.log(    ctx.message.partition_id,    ctx.message.offset,    ctx.message.op,    ctx.message.user,    ctx.message.payload,  );}); handle.stop();

Source behavior (client.ts):

  • loops consume(options) until stop() requested
  • if auto_ack and handler didn’t manually ctx.ack(), SDK auto-acks
  • when no more messages, sleeps ~1s before polling again
  • handle.run(handler, hooks?) accepts optional lifecycle hooks such as onBatchSuccess({ nextOffset, hasMore, messageCount }) after each successful poll

The consumer client delegates base connection handling to its internal @kalamdb/client instance, so connection/auth failures during login, refresh, or topic auth normalization also flow through onConnectionError.

Manual ACK mode

Use manual ACK when the message should be committed only after your handler finishes.

TS
const handle = client.consumer({  topic: 'orders',  group_id: 'billing-manual',  auto_ack: false,}); await handle.run(async (ctx) => {  await processOrder(ctx.message.payload);  await ctx.ack();});

ctx contains:

  • user (typed branded user identifier)
  • message (topic, group, partition, offset, key, op, timestamp, decoded payload)
  • ack(): Promise<void>

Topic payload contract in handlers

ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.

Most handlers can work with the payload directly:

  • direct row object for current full payloads
  • optional legacy wrapper object with row under payload.row if you are still consuming older message shapes

For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:

TS
function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null {  if (!payload || typeof payload !== 'object') return null;   const record = payload as Record<string, unknown>;  const source = (record.row && typeof record.row === 'object'    ? record.row    : record) as Record<string, unknown>;   const conversationId = Number(source.conversation_id);  const role = source.role;  const content = source.content;   if (!Number.isInteger(conversationId)) return null;  if (role !== 'user' && role !== 'assistant') return null;  if (typeof content !== 'string') return null;   return { conversation_id: conversationId, role, content };}

If your topic source uses another payload mode (key / diff), treat both as still in development. If you are testing them anyway, align parser logic with the current payload shape and remember that _table lives inside the decoded payload, not in a separate source_table field.

One-shot batch consume

TS
const batch = await client.consumeBatch({  topic: 'orders',  group_id: 'billing',  batch_size: 20,  start: 'earliest',}); console.log(batch.messages, batch.next_offset, batch.has_more);

If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:

TS
type OrderEventPayload = {  order_id: string;  status: string;  amount: number;  _table: string;}; const batch = await client.consumeBatch<OrderEventPayload>({  topic: 'orders',  group_id: 'billing',  batch_size: 20,  start: 'earliest',}); for (const message of batch.messages) {  console.log(message.payload.status);} const handle = client.consumer<OrderEventPayload>({  topic: 'orders',  group_id: 'billing',}); await handle.run(async (ctx) => {  console.log(ctx.message.payload.status);});

Each message includes:

  • topic, group_id, partition_id, offset
  • optional key
  • optional user
  • optional op
  • optional timestamp_ms
  • decoded payload

message.value still exists as a deprecated alias for message.payload.

Low-level ack API

TS
await client.ack('orders', 'billing', 0, 42);

This acknowledges up to and including offset 42.

ConsumeRequest options

TS
{  topic: string;  group_id: string;  start?: string;  partition_id?: number;  batch_size?: number;  timeout_seconds?: number;  auto_ack?: boolean;  concurrency_per_partition?: number;}

start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.

Current backend notes:

  • the HTTP topic API reads one partition per request
  • partition_id: 0 is the safe default today
  • timeout_seconds is accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly

USER and STREAM table write pattern in workers

When consuming events and writing into TYPE = 'USER' or TYPE = 'STREAM' tables, use executeAsUser() from an authorized service, DBA, or system account:

TS
await client.executeAsUser(  'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',  String(ctx.user ?? ''),  [conversationId, 'assistant', reply],);

This preserves per-user isolation in service-side writes through KalamDB’s explicit delegation boundary.

For the table-type and impersonation rules behind this pattern, see /docs/server/architecture/table-types and /docs/server/sql-reference/impersonation.

When to use which API

  • SQL subscriptions: row-level table/query reactivity.
  • consumer() from @kalamdb/consumer: explicit queue or partition processing with ack semantics.
  • runConsumer() from @kalamdb/consumer: higher-level runtime with retry, onConnect, onConnectionError, and onFailed lifecycle hooks. It owns ack timing internally, so there is no auto_ack option on RunConsumerOptions.
Last updated on