Summarizer Agent (KalamDB Agents)
This KalamDB agent example shows how to consume topic events, read the changed row, generate a one-sentence summary, and write enriched data back to the same table.
Use the full source example here:
Core KalamDB agent pattern
- A row in
blog.blogsis inserted or updated. - Topic route publishes that mutation to
blog.summarizer. - The agent consumes the message with
runAgent(). - The agent fetches the current row, streams summary chunks into
blog.summary_chunks, then writes the finalsummary. - Retry policy handles transient failures; hard failures are persisted in
blog.summary_failures.
This is a good starter pattern for enrichment agents (summarization, labeling, extraction, moderation) that should react to database changes in near real time.
Schema and topic route
CREATE NAMESPACE IF NOT EXISTS blog;
CREATE SHARED TABLE IF NOT EXISTS blog.blogs (
blog_id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
content TEXT NOT NULL,
summary TEXT,
created TIMESTAMP NOT NULL DEFAULT NOW(),
updated TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE SHARED TABLE IF NOT EXISTS blog.summary_failures (
run_key TEXT PRIMARY KEY,
blog_id TEXT NOT NULL,
error TEXT NOT NULL,
created TIMESTAMP NOT NULL DEFAULT NOW(),
updated TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE SHARED TABLE IF NOT EXISTS blog.summary_chunks (
run_key TEXT NOT NULL,
blog_id TEXT NOT NULL,
chunk_index BIGINT NOT NULL,
chunk_text TEXT NOT NULL,
created TIMESTAMP NOT NULL DEFAULT NOW(),
updated TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (run_key, chunk_index)
);
CREATE TOPIC blog.summarizer;
ALTER TOPIC blog.summarizer ADD SOURCE blog.blogs ON INSERT WITH (payload = 'full');
ALTER TOPIC blog.summarizer ADD SOURCE blog.blogs ON UPDATE WITH (payload = 'full');Agent runtime wiring
await runAgent<Record<string, unknown>>({
client,
name: 'summarizer-agent',
topic: 'blog.summarizer',
groupId: 'blog-summarizer-agent',
start: 'earliest',
batchSize: 20,
retry: {
maxAttempts: 3,
initialBackoffMs: 250,
maxBackoffMs: 1500,
multiplier: 2,
},
onRow,
onFailed,
ackOnFailed: true,
});onRow performs row-level summarization work. onFailed writes terminal failures to blog.summary_failures, so failed runs are inspectable and recoverable.
Current example code uses an UPDATE then INSERT fallback for failure persistence (instead of INSERT ... ON CONFLICT) to match current planner support.
Streaming chunks into a table (summarizer-agent.ts)
This variant stores each streamed chunk in blog.summary_chunks for observability/replay and then updates blog.blogs.summary with the joined result.
import { Auth, createClient, runAgent } from 'kalam-link';
import { GoogleGenerativeAI } from '@google/generative-ai';
const client = createClient({
url: process.env.KALAMDB_URL ?? 'http://127.0.0.1:8080',
auth: Auth.basic(
process.env.KALAMDB_USERNAME ?? 'root',
process.env.KALAMDB_PASSWORD ?? 'kalamdb123',
),
});
const gemini = process.env.GEMINI_API_KEY
? new GoogleGenerativeAI(process.env.GEMINI_API_KEY).getGenerativeModel({
model: process.env.GEMINI_MODEL ?? 'gemini-2.5-flash',
})
: null;
async function* streamSummary(content: string): AsyncGenerator<{ index: number; text: string }> {
if (!gemini) {
const fallback = content.trim().slice(0, 220);
if (fallback.length > 0) {
yield { index: 0, text: `${fallback}.` };
}
return;
}
const response = await gemini.generateContentStream({
contents: [{ role: 'user', parts: [{ text: `Summarize in one sentence:\n\n${content}` }] }],
});
let index = 0;
for await (const chunk of response.stream) {
const text = chunk.text();
if (!text) continue;
yield { index, text };
index += 1;
}
}
await runAgent<Record<string, unknown>>({
client,
name: 'summarizer-agent',
topic: process.env.KALAMDB_TOPIC ?? 'blog.summarizer',
groupId: process.env.KALAMDB_GROUP ?? 'blog-summarizer-agent',
start: 'earliest',
batchSize: 20,
retry: {
maxAttempts: 3,
initialBackoffMs: 250,
maxBackoffMs: 1500,
multiplier: 2,
},
onRow: async (ctx, row) => {
const blogId = row.blog_id;
if (typeof blogId !== 'string' && typeof blogId !== 'number') return;
const current = await ctx.queryOne<{ content: string }>(
'SELECT content FROM blog.blogs WHERE blog_id = $1',
[String(blogId)],
);
if (!current?.content) return;
await ctx.sql('DELETE FROM blog.summary_chunks WHERE run_key = $1', [ctx.runKey]);
const parts: string[] = [];
for await (const chunk of streamSummary(current.content)) {
parts.push(chunk.text);
await ctx.sql(
'INSERT INTO blog.summary_chunks (run_key, blog_id, chunk_index, chunk_text, created, updated) VALUES ($1, $2, $3, $4, NOW(), NOW())',
[ctx.runKey, String(blogId), String(chunk.index), chunk.text],
);
}
const summary = parts.join('').trim();
if (!summary) return;
await ctx.sql(
'UPDATE blog.blogs SET summary = $1, updated = NOW() WHERE blog_id = $2',
[summary, String(blogId)],
);
},
onFailed: async (ctx) => {
const blogId = ctx.row.blog_id;
const normalizedBlogId =
typeof blogId === 'string' || typeof blogId === 'number' ? String(blogId) : 'unknown';
const updated = await ctx.sql(
'UPDATE blog.summary_failures SET blog_id = $2, error = $3, updated = NOW() WHERE run_key = $1',
[ctx.runKey, normalizedBlogId, String(ctx.error ?? 'unknown')],
);
if (updated.rowsAffected === 0) {
await ctx.sql(
'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())',
[ctx.runKey, normalizedBlogId, String(ctx.error ?? 'unknown')],
);
}
},
ackOnFailed: true,
});For background on topics, consumers, and runtime helpers, see:
How to run this example
- Start KalamDB:
cd backend
cargo run- Bootstrap schema, topic routes, sample row, and
.env.local:
cd examples/summarizer-agent
./setup.sh- Install dependencies:
npm install- Optional: enable Gemini summaries (otherwise fallback summarizer is used):
GEMINI_API_KEY=your_key_here
GEMINI_MODEL=gemini-2.5-flash- Start the agent:
npm run start- Trigger summarization by updating blog content:
UPDATE blog.blogs
SET content = 'KalamDB topics let tiny agents react to data changes and enrich rows in place.'
WHERE blog_id = <sample_blog_id_from_setup_output>;- Verify summary output:
SELECT blog_id, content, summary, created, updated
FROM blog.blogs
WHERE blog_id = <sample_blog_id_from_setup_output>;You should see summary populated after the agent consumes the update event.
Why this example is useful
- Demonstrates a complete event-driven KalamDB agent loop with minimal code.
- Shows practical retry handling plus a persistent failure sink table.
- Shows how to stream model output chunks into a table for auditability and partial-progress visibility.
- Uses the same model for production-style background workers that enrich data in place.