Agent Runtime
kalam-link exposes a high-level agent runtime on top of topic consumers:
runAgent()for row-driven handlers with retry/failure hooksrunConsumer()for message-driven handlers with the same lifecycle modelcreateLangChainAdapter()for plugging in LangChain chat models
These APIs are exported directly from kalam-link.
runAgent() quick start
import { Auth, createClient, runAgent } from 'kalam-link';
const client = createClient({
url: 'http://127.0.0.1:8080',
auth: Auth.basic('root', 'kalamdb123'),
});
await runAgent<Record<string, unknown>>({
client,
name: 'blog-summarizer',
topic: 'blog.summarizer',
groupId: 'blog-summarizer-agent',
start: 'earliest',
batchSize: 20,
retry: {
maxAttempts: 3,
initialBackoffMs: 250,
maxBackoffMs: 1500,
multiplier: 2,
},
onRow: async (ctx, row) => {
const blogId = row.blog_id;
if (typeof blogId !== 'string' && typeof blogId !== 'number') {
return;
}
await ctx.sql(
'UPDATE blog.blogs SET updated = NOW() WHERE blog_id = $1',
[String(blogId)],
);
},
onFailed: async (ctx) => {
await ctx.sql(
'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())',
[ctx.runKey, String(ctx.row.blog_id ?? 'unknown'), String(ctx.error ?? 'unknown')],
);
},
ackOnFailed: true,
});Handler context (AgentContext)
onRow(ctx, row) receives:
- metadata:
name,topic,groupId,runKey,attempt,maxAttempts - source data:
message, parsedrow, producingusername - helpers:
sql(),queryOne(),queryAll(),ack() - LLM helper:
llm(whenllmadapter + optionalsystemPromptare configured)
runKey
Default format is:
<name>:<topic>:<partition_id>:<offset>
Override with runKeyFactory when integrating custom idempotency keys.
Retry and ACK behavior
runAgent() always consumes with manual ack semantics (auto_ack: false) and handles acking explicitly:
onRowsucceeds: message is acked.onRowthrows: retried based onretrypolicy.- retries exhausted:
- if
onFailedis missing, message is not acked. - if
onFailedsucceeds andackOnFailed !== false, message is acked. - if
onFailedthrows, message is not acked.
- if
Hooks:
onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })onError({ error, runKey, message })
Retry policy options
The retry object supports additional tuning:
jitterRatio— add randomness to backoff (0 disables jitter)shouldRetry(error, attempt)— classify retryable failures (defaults to retrying)
retry: {
maxAttempts: 5,
initialBackoffMs: 250,
maxBackoffMs: 5_000,
multiplier: 2,
jitterRatio: 0.1,
shouldRetry: (error) => {
// Example: don't retry on validation errors
return !String(error).includes('invalid-input');
},
}Other useful runAgent options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.
Row parsing
By default, the runtime parses row payloads like this:
- if message value has
rowobject, it usesvalue.row - otherwise it uses message value directly
You can override with rowParser(message):
rowParser: (message) => {
const value = message.value as Record<string, unknown> | null;
if (!value) return null;
if (typeof value.row === 'object' && value.row) return value.row as Record<string, unknown>;
return value;
}Return null only when you intentionally want to skip agent handling for a message.
runConsumer() shortcut
runConsumer() is a thin wrapper over runAgent() for message-level handling:
import { runConsumer } from 'kalam-link';
await runConsumer({
client,
name: 'orders-worker',
topic: 'orders.events',
groupId: 'orders-group',
onMessage: async (ctx) => {
console.log(ctx.message.offset, ctx.message.value);
},
});Use this when you want the retry/failure lifecycle but do not need row parsing.
runConsumer() inherits runAgent() ack behavior (acks after successful handler completion). Call ctx.ack() only if you explicitly want to ack before returning.
LangChain integration
createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():
import { createLangChainAdapter, runAgent } from 'kalam-link';
import { ChatOpenAI } from '@langchain/openai';
const llm = createLangChainAdapter(
new ChatOpenAI({ apiKey: process.env.OPENAI_API_KEY, model: 'gpt-4o-mini' }),
);
await runAgent({
client,
name: 'summary',
topic: 'blog.summarizer',
groupId: 'summary-group',
llm,
systemPrompt: 'Write one concise sentence.',
onRow: async (ctx) => {
const summary = await ctx.llm?.complete('Summarize this row');
console.log(summary);
},
});ctx.llm.complete() accepts either a plain string prompt or structured message input.
Production notes
- Prefer idempotent writes keyed by
runKey. - Keep
retry.maxAttemptsbounded and tune backoff for your latency profile. - Use
stopSignalfor graceful shutdown in containerized workers. - Persist terminal failures (
onFailed) to a table for replay/inspection.