Skip to Content
Consumer Runtime

Consumer Runtime

@kalamdb/consumer exposes a high-level worker runtime on top of topic consumers:

  • runConsumer() for row/change-driven handlers with retry/failure hooks
  • runAgent() as a deprecated compatibility alias for the same lifecycle model
  • createLangChainAdapter() for plugging in LangChain chat models

These APIs are exported directly from @kalamdb/consumer.

The client you pass to these helpers should be created with createConsumerClient() or otherwise satisfy the worker-side ConsumerClientLike contract.

runConsumer() quick start

TS
import { Auth } from '@kalamdb/client';import { createConsumerClient, runConsumer } from '@kalamdb/consumer'; const client = createConsumerClient({  url: 'http://127.0.0.1:2900',  authProvider: async () => Auth.basic('root', 'kalamdb123'),}); await runConsumer<Record<string, unknown>>({  client,  name: 'blog-summarizer',  topic: 'blog.summarizer',  groupId: 'blog-summarizer-agent',  start: 'earliest',  batchSize: 20,  retry: {    maxAttempts: 3,    initialBackoffMs: 250,    maxBackoffMs: 1500,    multiplier: 2,  },  onChange: async (ctx, change) => {    const row = change.data;    const blogId = row.blog_id;    if (typeof blogId !== 'string' && typeof blogId !== 'number') {      return;    }     await ctx.sql(      'UPDATE blog.blogs SET updated = NOW() WHERE blog_id = $1',      [String(blogId)],    );  },  onFailed: async (ctx) => {    await ctx.sql(      'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())',      [ctx.runKey, String(ctx.change.data.blog_id ?? 'unknown'), String(ctx.error ?? 'unknown')],    );  },  ackOnFailed: true,});

Handler context (ConsumerRunContext)

onChange(ctx, change) receives:

  • metadata: name, topic, groupId, runKey, attempt, maxAttempts
  • source data: change, message, producing user, typed op, timestampMs, partitionId, and offset
  • helpers: sql(), queryOne(), queryAll(), ack()
  • LLM helper: llm (when llm adapter + optional systemPrompt are configured)

Use change.data for the decoded changed row/event. The same object is available as ctx.change, but metadata lives on ctx, not on change. The older onRow(ctx, row) and ctx.row names are deprecated aliases.

runKey

Default format is:

<name>:<topic>:<partition_id>:<offset>

Override with runKeyFactory when integrating custom idempotency keys.

Retry and ACK behavior

runConsumer() always consumes with manual ack semantics (auto_ack: false) and handles acking explicitly:

  • onChange succeeds: message is acked.
  • onChange throws: retried based on retry policy.
  • retries exhausted:
    • if onFailed is missing, message is not acked.
    • if onFailed succeeds and ackOnFailed !== false, message is acked.
    • if onFailed throws, message is not acked.

Hooks:

  • onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })
  • onError({ error, runKey, message })

Retry policy options

The retry object supports additional tuning:

  • jitterRatio — add randomness to backoff (0 disables jitter)
  • shouldRetry(error, attempt) — classify retryable failures (defaults to retrying)
TS
retry: {  maxAttempts: 5,  initialBackoffMs: 250,  maxBackoffMs: 5_000,  multiplier: 2,  jitterRatio: 0.1,  shouldRetry: (error) => {    // Example: don't retry on validation errors    return !String(error).includes('invalid-input');  },}

Other useful runConsumer options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.

runConsumer() also supervises the underlying consumer loop. If the server is temporarily down or the connection drops, it retries with exponential backoff and jitter until stopSignal aborts. Configure this with connectionRetry.

Row parsing

By default, the runtime parses KalamDB topic rows like this:

  • it reads message.payload
  • if message.payload has a nested row object, it uses payload.row for backward compatibility
  • otherwise it uses message.payload directly

That means apps using the ORM generator can pass the generated row type directly and skip parser boilerplate:

TS
import { runConsumer } from '@kalamdb/consumer';import type { ChatDemoMessages } from './schema.generated'; await runConsumer<ChatDemoMessages>({  client,  name: 'chat-demo-agent',  topic: 'chat_demo.ai_inbox',  groupId: 'chat-demo-agent',  onChange: async (_ctx, change) => {    const row = change.data;    if (row.role !== 'user') return;    console.log(row.content);  },});

You can override with changeParser(message):

TS
changeParser: (message) => {  const payload = message.payload as Record<string, unknown> | null;  if (!payload) return null;  if (typeof payload.row === 'object' && payload.row) return payload.row as Record<string, unknown>;  return payload;}

Return null only when you intentionally want to skip agent handling for a message.

Message-Level Handling

Use onMessage only for older message-level code that wants to read directly from ctx.message:

TS
import { runConsumer } from '@kalamdb/consumer'; type OrderEventPayload = {  order_id: string;  status: string;  amount: number;  _table: string;}; await runConsumer<OrderEventPayload>({  client,  name: 'orders-worker',  topic: 'orders.events',  groupId: 'orders-group',  onMessage: async (ctx) => {    console.log(ctx.message.offset, ctx.message.op, ctx.message.payload.status);  },});

Use this when you want the retry/failure lifecycle but do not need row parsing.

runConsumer() acks after successful handler completion. Call ctx.ack() only if you explicitly want to ack before returning.

LangChain integration

createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():

TS
import { createLangChainAdapter, runConsumer } from '@kalamdb/consumer';import { ChatOpenAI } from '@langchain/openai'; const llm = createLangChainAdapter(  new ChatOpenAI({ apiKey: process.env.OPENAI_API_KEY, model: 'gpt-4o-mini' }),); await runConsumer({  client,  name: 'summary',  topic: 'blog.summarizer',  groupId: 'summary-group',  llm,  systemPrompt: 'Write one concise sentence.',  onChange: async (ctx, _change) => {    const summary = await ctx.llm?.complete('Summarize this row');    console.log(summary);  },});

ctx.llm.complete() accepts either a plain string prompt or structured message input.

Production notes

  • Prefer idempotent writes keyed by runKey.
  • Keep retry.maxAttempts bounded and tune backoff for your latency profile.
  • Use stopSignal for graceful shutdown in containerized workers.
  • Leave the default connection retry enabled for long-running workers so transient restarts do not exit the process.
  • Persist terminal failures (onFailed) to a table for replay/inspection.
Last updated on