Skip to Content
SDK & ClientTypeScript SDKTopic Consumers & ACK

Topic Consumers & ACK

Beyond SQL subscriptions, kalam-link exposes typed topic consume/ack APIs.

For high-level row-processing agents with retries and failure hooks, see:

Consumer builder API

const handle = client.consumer({ topic: 'orders', group_id: 'billing', auto_ack: true, batch_size: 10, }); await handle.run(async (ctx) => { console.log(ctx.message.offset, ctx.message.value); }); handle.stop();

Source behavior (client.ts):

  • loops consume(options) until stop() requested
  • if auto_ack and handler didn’t manually ctx.ack(), SDK auto-acks
  • when no more messages, sleeps ~1s before polling again

Manual ACK mode

const handle = client.consumer({ topic: 'orders', group_id: 'billing-manual', auto_ack: false, }); await handle.run(async (ctx) => { await processOrder(ctx.message.value); await ctx.ack(); });

ctx contains:

  • username (typed branded username)
  • message (topic, group, partition, offset, payload)
  • ack(): Promise<void>

Topic payload contract in handlers

ctx.message.value is JSON payload from the topic source. In chat-style pipelines with WITH (payload = 'full'), handlers commonly receive either:

  • direct row object
  • wrapper object with row under value.row

Use a defensive parser:

function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null { if (!payload || typeof payload !== 'object') return null; const record = payload as Record<string, unknown>; const source = (record.row && typeof record.row === 'object' ? record.row : record) as Record<string, unknown>; const conversationId = Number(source.conversation_id); const role = source.role; const content = source.content; if (!Number.isInteger(conversationId)) return null; if (role !== 'user' && role !== 'assistant') return null; if (typeof content !== 'string') return null; return { conversation_id: conversationId, role, content }; }

If your topic source uses another payload mode (key / diff), align parser logic with the selected topic payload shape.

One-shot batch consume

const batch = await client.consumeBatch({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest', }); console.log(batch.messages, batch.next_offset, batch.has_more);

Low-level ack API

await client.ack('orders', 'billing', 0, 42);

This acknowledges up to and including offset 42.

ConsumeRequest options

{ topic: string; group_id: string; start?: string; partition_id?: number; batch_size?: number; timeout_seconds?: number; auto_ack?: boolean; concurrency_per_partition?: number; }

start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.

USER table write pattern in workers

When consuming events and writing into TYPE = 'USER' tables, write as the originating user:

await client.executeAsUser( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', String(ctx.username ?? ''), [conversationId, 'assistant', reply], );

This preserves per-user isolation in service-side writes.

When to use which API

  • SQL subscriptions: row-level table/query reactivity.
  • consumer(): explicit queue/partition/group processing with ack semantics.
  • runAgent()/runConsumer(): higher-level runtime with retry + onFailed lifecycle hooks.
Last updated on