Skip to Content
Consumer Runtime

Consumer Runtime

@kalamdb/consumer exposes a high-level worker runtime on top of topic consumers:

  • runConsumer() for row/change-driven handlers with retry/failure hooks
  • runAgent() as a deprecated compatibility alias for the same lifecycle model
  • createLangChainAdapter() for plugging in LangChain chat models

These APIs are exported directly from @kalamdb/consumer.

The client you pass to these helpers should be created with createConsumerClient() or otherwise satisfy the worker-side ConsumerClientLike contract.

runConsumer() quick start

ts snippetTS
import { Auth } from '@kalamdb/client';import { createConsumerClient, runConsumer } from '@kalamdb/consumer'; const client = createConsumerClient({  url: 'http://127.0.0.1:2900',  authProvider: async () => Auth.basic('root', 'kalamdb123'),}); await runConsumer<Record<string, unknown>>({  client,  name: 'blog-summarizer',  topic: 'blog.summarizer',  groupId: 'blog-summarizer-agent',  start: 'earliest',  batchSize: 20,  retry: {    maxAttempts: 3,    initialBackoffMs: 250,    maxBackoffMs: 1500,    multiplier: 2,  },  onChange: async (ctx, change) => {    const row = change.data;    const blogId = row.blog_id;    if (typeof blogId !== 'string' && typeof blogId !== 'number') {      return;    }     await ctx.sql(      'UPDATE blog.blogs SET updated = NOW() WHERE blog_id = $1',      [String(blogId)],    );  },  onFailed: async (ctx, change) => {    await ctx.sql(      'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())',      [ctx.runKey, String(change.data.blog_id ?? 'unknown'), String(ctx.error ?? 'unknown')],    );  },  ackOnFailed: true,});

Handler context (ConsumerRunContext)

onChange(ctx, change) receives:

  • runtime metadata: ctx.name, ctx.runKey, ctx.attempt, and ctx.maxAttempts
  • source data: change.data is the decoded row/event payload
  • change metadata: change.user, typed change.op, change.key, change.timestampMs, change.partitionId, change.offset, change.topic, change.groupId, and metadata-only change.message
  • helpers: sql(), queryOne(), queryAll(), ack()
  • LLM helper: llm (when llm adapter + optional systemPrompt are configured)

Use change.data for the decoded changed row/event and use the other change.* fields for metadata about that same event. High-level ctx is intentionally reserved for runtime execution state and helpers, so it does not expose message, change, user, op, or offset duplicates. The high-level change.message object intentionally omits payload, deprecated value, and raw transport change fields, so the decoded row lives in exactly one place: change.data. change.user is populated only when the topic event carries a subject and is expected to be undefined for shared-table routes such as the summarizer example. The older onRow(ctx, row) alias remains deprecated; use onChange(ctx, change) or onFailed(ctx, change) for the stable shape.

Generated ORM row types use the same interface. Pass the generated row type as the first runConsumer<T>() generic, read the row from change.data, and read event metadata from change.op, change.user, change.offset, or change.message. No ORM-specific wrapper is needed.

runKey

Default format is:

<name>:<topic>:<partition_id>:<offset>

Override with runKeyFactory when integrating custom idempotency keys.

Retry and ACK behavior

runConsumer() does not expose an auto_ack option. Internally it creates the underlying low-level consumer with auto_ack: false so the runtime can control retries and ack timing explicitly:

  • onChange succeeds: message is acked.
  • onChange throws: retried based on retry policy.
  • retries exhausted:
    • if onFailed is missing, message is not acked.
    • if onFailed succeeds and ackOnFailed !== false, message is acked.
    • if onFailed throws, message is not acked.

If you need to choose auto_ack yourself, use client.consumer() directly instead of runConsumer().

Hooks:

  • onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })
  • onError({ error, runKey, message })
  • onConnectionRetry({ error, attempt, maxAttempts, backoffMs })
  • onConnectionRestored({ attempt })
  • onConnectionError({ error, attempt })

Retry policy options

The retry object supports additional tuning:

  • jitterRatio — add randomness to backoff (0 disables jitter)
  • shouldRetry(error, attempt) — classify retryable failures (defaults to retrying)
ts snippetTS
retry: {  maxAttempts: 5,  initialBackoffMs: 250,  maxBackoffMs: 5_000,  multiplier: 2,  jitterRatio: 0.1,  shouldRetry: (error) => {    // Example: don't retry on validation errors    return !String(error).includes('invalid-input');  },}

Other useful runConsumer options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.

runConsumer() also supervises the underlying consumer loop. If the server is temporarily down or the connection drops, it retries with exponential backoff and jitter until stopSignal aborts. Configure this with connectionRetry:

ts snippetTS
connectionRetry: {  initialBackoffMs: 500,  maxBackoffMs: 30_000,  multiplier: 1.8,  jitterRatio: 0.2,}

Use the connection hooks when you want operators to see the worker lifecycle clearly in logs:

ts snippetTS
await runConsumer({  client,  name: 'blog-summarizer',  topic: 'blog.summarizer',  groupId: 'blog-summarizer-agent',  onConnectionRetry: ({ error, attempt, maxAttempts, backoffMs }) => {    console.warn('worker cannot reach KalamDB:', String(error));    console.warn(`retrying in ${backoffMs}ms (attempt ${maxAttempts ? `${attempt}/${maxAttempts}` : attempt})`);  },  onConnectionRestored: ({ attempt }) => {    console.log(`worker reconnected after ${attempt} retry attempt${attempt === 1 ? '' : 's'}`);  },  onConnectionError: ({ error, attempt }) => {    console.error(`worker stopped reconnecting after ${attempt} attempts:`, error);  },  onChange: async () => {},});

onConnectionRestored() fires once after a retriable connection failure when the runtime successfully reaches KalamDB again. With the default createConsumerClient() implementation, this happens after the first successful poll, even if that poll returns no messages.

Row parsing

By default, the runtime parses KalamDB topic rows like this:

  • it reads message.payload
  • if message.payload has a nested row object, it uses payload.row for backward compatibility
  • otherwise it uses message.payload directly

That means apps using the ORM generator can pass the generated row type directly and skip parser boilerplate:

ts snippetTS
import { runConsumer } from '@kalamdb/consumer';import type { ChatDemoMessages } from './schema.generated'; await runConsumer<ChatDemoMessages>({  client,  name: 'chat-demo-agent',  topic: 'chat_demo.ai_inbox',  groupId: 'chat-demo-agent',  onChange: async (_ctx, change) => {    const row = change.data;    if (row.role !== 'user') return;    console.log(row.content);  },});

You can override with changeParser(message):

ts snippetTS
changeParser: (message) => {  const payload = message.payload as Record<string, unknown> | null;  if (!payload) return null;  if (typeof payload.row === 'object' && payload.row) return payload.row as Record<string, unknown>;  return payload;}

Return null only when you intentionally want to skip agent handling for a message.

Message-Level Handling

Use onMessage only for older single-argument handlers that still want the retry/failure lifecycle:

ts snippetTS
import { runConsumer } from '@kalamdb/consumer'; type OrderEventPayload = {  order_id: string;  status: string;  amount: number;  _table: string;}; await runConsumer<OrderEventPayload>({  client,  name: 'orders-worker',  topic: 'orders.events',  groupId: 'orders-group',  onMessage: async (_ctx, change) => {    console.log(change.offset, change.op, change.data.status);  },});

onMessage remains as a deprecated compatibility hook. New workers should use onChange(ctx, change) so change data and per-change metadata stay together.

runConsumer() acks after successful handler completion. Call ctx.ack() only if you explicitly want to ack before returning.

LangChain integration

llm and systemPrompt are still supported optional RunConsumerOptions fields. If you do not need model-assisted workers, leave them undefined and ctx.llm will be null.

createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():

ts snippetTS
import { createLangChainAdapter, runConsumer } from '@kalamdb/consumer';import { ChatOpenAI } from '@langchain/openai'; const llm = createLangChainAdapter(  new ChatOpenAI({ apiKey: process.env.OPENAI_API_KEY, model: 'gpt-4o-mini' }),); await runConsumer({  client,  name: 'summary',  topic: 'blog.summarizer',  groupId: 'summary-group',  llm,  systemPrompt: 'Write one concise sentence.',  onChange: async (ctx, _change) => {    const summary = await ctx.llm?.complete('Summarize this row');    console.log(summary);  },});

ctx.llm.complete() accepts either a plain string prompt or structured message input.

Production notes

  • Prefer idempotent writes keyed by runKey.
  • Keep retry.maxAttempts bounded and tune backoff for your latency profile.
  • Use stopSignal for graceful shutdown in containerized workers.
  • Leave the default connection retry enabled for long-running workers so transient restarts do not exit the process.
  • Persist terminal failures (onFailed) to a table for replay/inspection.
Last updated on