Topic Consumers & ACK
Beyond SQL subscriptions, kalam-link exposes typed topic consume/ack APIs.
For high-level row-processing agents with retries and failure hooks, see:
Consumer builder API
const handle = client.consumer({
topic: 'orders',
group_id: 'billing',
auto_ack: true,
batch_size: 10,
});
await handle.run(async (ctx) => {
console.log(ctx.message.offset, ctx.message.value);
});
handle.stop();Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
Manual ACK mode
const handle = client.consumer({
topic: 'orders',
group_id: 'billing-manual',
auto_ack: false,
});
await handle.run(async (ctx) => {
await processOrder(ctx.message.value);
await ctx.ack();
});ctx contains:
username(typed branded username)message(topic, group, partition, offset, payload)ack(): Promise<void>
Topic payload contract in handlers
ctx.message.value is JSON payload from the topic source. In chat-style pipelines with WITH (payload = 'full'), handlers commonly receive either:
- direct row object
- wrapper object with row under
value.row
Use a defensive parser:
function extractRow(payload: unknown): { conversation_id: number; role: string; content: string } | null {
if (!payload || typeof payload !== 'object') return null;
const record = payload as Record<string, unknown>;
const source = (record.row && typeof record.row === 'object'
? record.row
: record) as Record<string, unknown>;
const conversationId = Number(source.conversation_id);
const role = source.role;
const content = source.content;
if (!Number.isInteger(conversationId)) return null;
if (role !== 'user' && role !== 'assistant') return null;
if (typeof content !== 'string') return null;
return { conversation_id: conversationId, role, content };
}If your topic source uses another payload mode (key / diff), align parser logic with the selected topic payload shape.
One-shot batch consume
const batch = await client.consumeBatch({
topic: 'orders',
group_id: 'billing',
batch_size: 20,
start: 'earliest',
});
console.log(batch.messages, batch.next_offset, batch.has_more);Low-level ack API
await client.ack('orders', 'billing', 0, 42);This acknowledges up to and including offset 42.
ConsumeRequest options
{
topic: string;
group_id: string;
start?: string;
partition_id?: number;
batch_size?: number;
timeout_seconds?: number;
auto_ack?: boolean;
concurrency_per_partition?: number;
}start accepts backend-supported start modes (for example earliest or latest) and can include offset-style values depending on server routing.
USER table write pattern in workers
When consuming events and writing into TYPE = 'USER' tables, write as the originating user:
await client.executeAsUser(
'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)',
String(ctx.username ?? ''),
[conversationId, 'assistant', reply],
);This preserves per-user isolation in service-side writes.
When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer(): explicit queue/partition/group processing with ack semantics.
- runAgent()/runConsumer(): higher-level runtime with retry +
onFailedlifecycle hooks.