Skip to Content

Agent Runtime

kalam-link exposes a high-level agent runtime on top of topic consumers:

  • runAgent() for row-driven handlers with retry/failure hooks
  • runConsumer() for message-driven handlers with the same lifecycle model
  • createLangChainAdapter() for plugging in LangChain chat models

These APIs are exported directly from kalam-link.

runAgent() quick start

import { Auth, createClient, runAgent } from 'kalam-link'; const client = createClient({ url: 'http://127.0.0.1:8080', auth: Auth.basic('root', 'kalamdb123'), }); await runAgent<Record<string, unknown>>({ client, name: 'blog-summarizer', topic: 'blog.summarizer', groupId: 'blog-summarizer-agent', start: 'earliest', batchSize: 20, retry: { maxAttempts: 3, initialBackoffMs: 250, maxBackoffMs: 1500, multiplier: 2, }, onRow: async (ctx, row) => { const blogId = row.blog_id; if (typeof blogId !== 'string' && typeof blogId !== 'number') { return; } await ctx.sql( 'UPDATE blog.blogs SET updated = NOW() WHERE blog_id = $1', [String(blogId)], ); }, onFailed: async (ctx) => { await ctx.sql( 'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())', [ctx.runKey, String(ctx.row.blog_id ?? 'unknown'), String(ctx.error ?? 'unknown')], ); }, ackOnFailed: true, });

Handler context (AgentContext)

onRow(ctx, row) receives:

  • metadata: name, topic, groupId, runKey, attempt, maxAttempts
  • source data: message, parsed row, producing username
  • helpers: sql(), queryOne(), queryAll(), ack()
  • LLM helper: llm (when llm adapter + optional systemPrompt are configured)

runKey

Default format is:

<name>:<topic>:<partition_id>:<offset>

Override with runKeyFactory when integrating custom idempotency keys.

Retry and ACK behavior

runAgent() always consumes with manual ack semantics (auto_ack: false) and handles acking explicitly:

  • onRow succeeds: message is acked.
  • onRow throws: retried based on retry policy.
  • retries exhausted:
    • if onFailed is missing, message is not acked.
    • if onFailed succeeds and ackOnFailed !== false, message is acked.
    • if onFailed throws, message is not acked.

Hooks:

  • onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })
  • onError({ error, runKey, message })

Retry policy options

The retry object supports additional tuning:

  • jitterRatio — add randomness to backoff (0 disables jitter)
  • shouldRetry(error, attempt) — classify retryable failures (defaults to retrying)
retry: { maxAttempts: 5, initialBackoffMs: 250, maxBackoffMs: 5_000, multiplier: 2, jitterRatio: 0.1, shouldRetry: (error) => { // Example: don't retry on validation errors return !String(error).includes('invalid-input'); }, }

Other useful runAgent options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.

Row parsing

By default, the runtime parses row payloads like this:

  • if message value has row object, it uses value.row
  • otherwise it uses message value directly

You can override with rowParser(message):

rowParser: (message) => { const value = message.value as Record<string, unknown> | null; if (!value) return null; if (typeof value.row === 'object' && value.row) return value.row as Record<string, unknown>; return value; }

Return null only when you intentionally want to skip agent handling for a message.

runConsumer() shortcut

runConsumer() is a thin wrapper over runAgent() for message-level handling:

import { runConsumer } from 'kalam-link'; await runConsumer({ client, name: 'orders-worker', topic: 'orders.events', groupId: 'orders-group', onMessage: async (ctx) => { console.log(ctx.message.offset, ctx.message.value); }, });

Use this when you want the retry/failure lifecycle but do not need row parsing.

runConsumer() inherits runAgent() ack behavior (acks after successful handler completion). Call ctx.ack() only if you explicitly want to ack before returning.

LangChain integration

createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():

import { createLangChainAdapter, runAgent } from 'kalam-link'; import { ChatOpenAI } from '@langchain/openai'; const llm = createLangChainAdapter( new ChatOpenAI({ apiKey: process.env.OPENAI_API_KEY, model: 'gpt-4o-mini' }), ); await runAgent({ client, name: 'summary', topic: 'blog.summarizer', groupId: 'summary-group', llm, systemPrompt: 'Write one concise sentence.', onRow: async (ctx) => { const summary = await ctx.llm?.complete('Summarize this row'); console.log(summary); }, });

ctx.llm.complete() accepts either a plain string prompt or structured message input.

Production notes

  • Prefer idempotent writes keyed by runKey.
  • Keep retry.maxAttempts bounded and tune backoff for your latency profile.
  • Use stopSignal for graceful shutdown in containerized workers.
  • Persist terminal failures (onFailed) to a table for replay/inspection.
Last updated on