Skip to Content
Topic Consumers & ACK

Topic Consumers & ACK

@kalamdb/consumer is the worker package. Use it for topic consumption, ACKs, and retrying workers. Keep it out of browser code.

The consumer client wraps an internal @kalamdb/client instance, so workers still have query(), queryOne(), queryAll(), role-matrix executeAsUser(), login(), refreshToken(), and disconnect() available alongside the topic APIs.

For high-level row-processing workers with retries and failure hooks, see:

runConsumer() already deserializes standard KalamDB topic row payloads and unwraps legacy { row: ... } envelopes. Use a custom changeParser only for non-row payload modes or custom envelopes.

Create a worker client with the same auth options you would use for @kalamdb/client:

ts snippetTS
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.basic('worker', 'Secret123!'),});

Consumer builder API

Copy this when you want a long-running worker loop.

ts snippetTS
const handle = client.consumer({  topic: 'orders',  group_id: 'billing',  auto_ack: true,  batch_size: 10,}); await handle.run(async (ctx) => {  console.log(    ctx.message.partition_id,    ctx.message.offset,    ctx.message.op,    ctx.message.user,    ctx.message.payload,  );}); handle.stop();

Source behavior (client.ts):

  • loops consume(options) until stop() requested
  • if auto_ack and handler didn’t manually ctx.ack(), SDK auto-acks
  • when no more messages, sleeps ~1s before polling again
  • handle.run(handler, hooks?) accepts optional lifecycle hooks such as onBatchSuccess({ nextOffset, hasMore, messageCount }) after each successful poll

Manual ACK mode

Use manual ACK when the message should be committed only after your handler finishes.

ts snippetTS
const handle = client.consumer({  topic: 'orders',  group_id: 'billing-manual',  auto_ack: false,}); await handle.run(async (ctx) => {  await processOrder(ctx.message.payload);  await ctx.ack();});

ctx contains:

  • user (typed branded user identifier)
  • message (topic, group, partition, offset, key, op, timestamp, decoded payload)
  • ack(): Promise<void>

Topic payload contract in handlers

ctx.message.payload is the decoded JSON payload from the topic source. In the current backend, WITH (payload = 'full') publishes the changed row object directly and injects _table with the source table id.

Most handlers can work with the payload directly:

  • direct row object for current full payloads
  • optional legacy wrapper object with row under payload.row if you are still consuming older message shapes

For lower-level consumer().run() handlers, use a defensive parser when you need to normalize multiple payload shapes yourself:

ts snippetTS
function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null {  if (!payload || typeof payload !== 'object') return null;   const record = payload as Record<string, unknown>;  const source = (record.row && typeof record.row === 'object'    ? record.row    : record) as Record<string, unknown>;   const conversationId = Number(source.conversation_id);  const role = source.role;  const content = source.content;   if (!Number.isInteger(conversationId)) return null;  if (role !== 'user' && role !== 'assistant') return null;  if (typeof content !== 'string') return null;   return { conversation_id: conversationId, role, content };}

If your topic source uses another payload mode (key / diff), treat both as still in development. If you are testing them anyway, align parser logic with the current payload shape and remember that _table lives inside the decoded payload, not in a separate source_table field.

One-shot batch consume

ts snippetTS
const batch = await client.consumeBatch({  topic: 'orders',  group_id: 'billing',  batch_size: 20,  start: 'earliest',}); console.log(batch.messages, batch.next_offset, batch.has_more);

If you know the payload shape ahead of time, type it once and keep the whole consume flow strongly typed:

ts snippetTS
type OrderEventPayload = {  order_id: string;  status: string;  amount: number;  _table: string;}; const batch = await client.consumeBatch<OrderEventPayload>({  topic: 'orders',  group_id: 'billing',  batch_size: 20,  start: 'earliest',}); for (const message of batch.messages) {  console.log(message.payload.status);} const handle = client.consumer<OrderEventPayload>({  topic: 'orders',  group_id: 'billing',}); await handle.run(async (ctx) => {  console.log(ctx.message.payload.status);});

Each message includes:

  • topic, group_id, partition_id, offset
  • optional key
  • optional user
  • optional op
  • optional timestamp_ms
  • decoded payload

message.value still exists as a deprecated alias for message.payload.

Low-level ack API

ts snippetTS
await client.ack('orders', 'billing', 0, 42);

This acknowledges up to and including offset 42.

ConsumeRequest options

ts snippetTS
{  topic: string;  group_id: string;  start?: string;  partition_id?: number;  batch_size?: number;  timeout_seconds?: number;  auto_ack?: boolean;  concurrency_per_partition?: number;}

start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.

Current backend notes:

  • the HTTP topic API reads one partition per request
  • partition_id: 0 is the safe default today
  • timeout_seconds is accepted by the API shape but does not long-poll yet; the SDK keeps polling explicitly

USER and STREAM table write pattern in workers

When consuming events and writing into TYPE = 'USER' or TYPE = 'STREAM' tables, use executeAsUser() from an authorized service, DBA, or system account:

ts snippetTS
await client.executeAsUser(  'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',  String(ctx.user ?? ''),  [conversationId, 'assistant', reply],);

This preserves per-user isolation in service-side writes through KalamDB’s explicit delegation boundary.

When to use which API

  • SQL subscriptions: row-level table/query reactivity.
  • consumer() from @kalamdb/consumer: explicit queue or partition processing with ack semantics.
  • runConsumer() from @kalamdb/consumer: higher-level runtime with retry and onFailed lifecycle hooks. It owns ack timing internally, so there is no auto_ack option on RunConsumerOptions.
Last updated on