Consumer Runtime
@kalamdb/consumer exposes a high-level worker runtime on top of topic consumers:
runConsumer()for row/change-driven handlers with retry/failure hooksrunAgent()as a deprecated compatibility alias for the same lifecycle modelcreateLangChainAdapter()for plugging in LangChain chat models
These APIs are exported directly from @kalamdb/consumer.
The client you pass to these helpers should be created with createConsumerClient() or otherwise satisfy the worker-side ConsumerClientLike contract.
runConsumer() quick start
import { Auth } from '@kalamdb/client';import { createConsumerClient, runConsumer } from '@kalamdb/consumer'; const client = createConsumerClient({ url: 'http://127.0.0.1:2900', authProvider: async () => Auth.basic('root', 'kalamdb123'),}); await runConsumer<Record<string, unknown>>({ client, name: 'blog-summarizer', topic: 'blog.summarizer', groupId: 'blog-summarizer-agent', start: 'earliest', batchSize: 20, retry: { maxAttempts: 3, initialBackoffMs: 250, maxBackoffMs: 1500, multiplier: 2, }, onChange: async (ctx, change) => { const row = change.data; const blogId = row.blog_id; if (typeof blogId !== 'string' && typeof blogId !== 'number') { return; } await ctx.sql( 'UPDATE blog.blogs SET updated = NOW() WHERE blog_id = $1', [String(blogId)], ); }, onFailed: async (ctx, change) => { await ctx.sql( 'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())', [ctx.runKey, String(change.data.blog_id ?? 'unknown'), String(ctx.error ?? 'unknown')], ); }, ackOnFailed: true,});Handler context (ConsumerRunContext)
onChange(ctx, change) receives:
- runtime metadata:
ctx.name,ctx.runKey,ctx.attempt, andctx.maxAttempts - source data:
change.datais the decoded row/event payload - change metadata:
change.user, typedchange.op,change.key,change.timestampMs,change.partitionId,change.offset,change.topic,change.groupId, and metadata-onlychange.message - helpers:
sql(),queryOne(),queryAll(),ack() - LLM helper:
llm(whenllmadapter + optionalsystemPromptare configured)
Use change.data for the decoded changed row/event and use the other change.* fields for metadata about that same event. High-level ctx is intentionally reserved for runtime execution state and helpers, so it does not expose message, change, user, op, or offset duplicates. The high-level change.message object intentionally omits payload, deprecated value, and raw transport change fields, so the decoded row lives in exactly one place: change.data. change.user is populated only when the topic event carries a subject and is expected to be undefined for shared-table routes such as the summarizer example. The older onRow(ctx, row) alias remains deprecated; use onChange(ctx, change) or onFailed(ctx, change) for the stable shape.
Generated ORM row types use the same interface. Pass the generated row type as the first runConsumer<T>() generic, read the row from change.data, and read event metadata from change.op, change.user, change.offset, or change.message. No ORM-specific wrapper is needed.
runKey
Default format is:
<name>:<topic>:<partition_id>:<offset>
Override with runKeyFactory when integrating custom idempotency keys.
Retry and ACK behavior
runConsumer() does not expose an auto_ack option. Internally it creates the underlying low-level consumer with auto_ack: false so the runtime can control retries and ack timing explicitly:
onChangesucceeds: message is acked.onChangethrows: retried based onretrypolicy.- retries exhausted:
- if
onFailedis missing, message is not acked. - if
onFailedsucceeds andackOnFailed !== false, message is acked. - if
onFailedthrows, message is not acked.
- if
If you need to choose auto_ack yourself, use client.consumer() directly instead of runConsumer().
Hooks:
onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })onError({ error, runKey, message })onConnectionRetry({ error, attempt, maxAttempts, backoffMs })onConnectionRestored({ attempt })onConnectionError({ error, attempt })
Retry policy options
The retry object supports additional tuning:
jitterRatio— add randomness to backoff (0 disables jitter)shouldRetry(error, attempt)— classify retryable failures (defaults to retrying)
retry: { maxAttempts: 5, initialBackoffMs: 250, maxBackoffMs: 5_000, multiplier: 2, jitterRatio: 0.1, shouldRetry: (error) => { // Example: don't retry on validation errors return !String(error).includes('invalid-input'); },}Other useful runConsumer options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.
runConsumer() also supervises the underlying consumer loop. If the server is temporarily down or the connection drops, it retries with exponential backoff and jitter until stopSignal aborts. Configure this with connectionRetry:
connectionRetry: { initialBackoffMs: 500, maxBackoffMs: 30_000, multiplier: 1.8, jitterRatio: 0.2,}Use the connection hooks when you want operators to see the worker lifecycle clearly in logs:
await runConsumer({ client, name: 'blog-summarizer', topic: 'blog.summarizer', groupId: 'blog-summarizer-agent', onConnectionRetry: ({ error, attempt, maxAttempts, backoffMs }) => { console.warn('worker cannot reach KalamDB:', String(error)); console.warn(`retrying in ${backoffMs}ms (attempt ${maxAttempts ? `${attempt}/${maxAttempts}` : attempt})`); }, onConnectionRestored: ({ attempt }) => { console.log(`worker reconnected after ${attempt} retry attempt${attempt === 1 ? '' : 's'}`); }, onConnectionError: ({ error, attempt }) => { console.error(`worker stopped reconnecting after ${attempt} attempts:`, error); }, onChange: async () => {},});onConnectionRestored() fires once after a retriable connection failure when the runtime successfully reaches KalamDB again. With the default createConsumerClient() implementation, this happens after the first successful poll, even if that poll returns no messages.
Row parsing
By default, the runtime parses KalamDB topic rows like this:
- it reads
message.payload - if
message.payloadhas a nestedrowobject, it usespayload.rowfor backward compatibility - otherwise it uses
message.payloaddirectly
That means apps using the ORM generator can pass the generated row type directly and skip parser boilerplate:
import { runConsumer } from '@kalamdb/consumer';import type { ChatDemoMessages } from './schema.generated'; await runConsumer<ChatDemoMessages>({ client, name: 'chat-demo-agent', topic: 'chat_demo.ai_inbox', groupId: 'chat-demo-agent', onChange: async (_ctx, change) => { const row = change.data; if (row.role !== 'user') return; console.log(row.content); },});You can override with changeParser(message):
changeParser: (message) => { const payload = message.payload as Record<string, unknown> | null; if (!payload) return null; if (typeof payload.row === 'object' && payload.row) return payload.row as Record<string, unknown>; return payload;}Return null only when you intentionally want to skip agent handling for a message.
Message-Level Handling
Use onMessage only for older single-argument handlers that still want the retry/failure lifecycle:
import { runConsumer } from '@kalamdb/consumer'; type OrderEventPayload = { order_id: string; status: string; amount: number; _table: string;}; await runConsumer<OrderEventPayload>({ client, name: 'orders-worker', topic: 'orders.events', groupId: 'orders-group', onMessage: async (_ctx, change) => { console.log(change.offset, change.op, change.data.status); },});onMessage remains as a deprecated compatibility hook. New workers should use onChange(ctx, change) so change data and per-change metadata stay together.
runConsumer() acks after successful handler completion. Call ctx.ack() only if you explicitly want to ack before returning.
LangChain integration
llm and systemPrompt are still supported optional RunConsumerOptions fields. If you do not need model-assisted workers, leave them undefined and ctx.llm will be null.
createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():
import { createLangChainAdapter, runConsumer } from '@kalamdb/consumer';import { ChatOpenAI } from '@langchain/openai'; const llm = createLangChainAdapter( new ChatOpenAI({ apiKey: process.env.OPENAI_API_KEY, model: 'gpt-4o-mini' }),); await runConsumer({ client, name: 'summary', topic: 'blog.summarizer', groupId: 'summary-group', llm, systemPrompt: 'Write one concise sentence.', onChange: async (ctx, _change) => { const summary = await ctx.llm?.complete('Summarize this row'); console.log(summary); },});ctx.llm.complete() accepts either a plain string prompt or structured message input.
Production notes
- Prefer idempotent writes keyed by
runKey. - Keep
retry.maxAttemptsbounded and tune backoff for your latency profile. - Use
stopSignalfor graceful shutdown in containerized workers. - Leave the default connection retry enabled for long-running workers so transient restarts do not exit the process.
- Persist terminal failures (
onFailed) to a table for replay/inspection.