Skip to Content
Summarizer Agent

Summarizer Agent (KalamDB Agents)

This KalamDB agent example shows how to consume topic events, read the changed row, generate a one-sentence summary, and write enriched data back to the same table.

Use the full source example here:

Core KalamDB agent pattern

  1. A row in blog.blogs is inserted or updated.
  2. Topic route publishes that mutation to blog.summarizer.
  3. The agent consumes the message with runConsumer().
  4. The agent fetches the current row, streams summary chunks into blog.summary_chunks, then writes the final summary.
  5. Retry policy handles transient failures; hard failures are persisted in blog.summary_failures.

This is a good starter pattern for enrichment agents (summarization, labeling, extraction, moderation) that should react to database changes in near real time.

Because blog.blogs is a SHARED table in this example, change.user is expected to be undefined. Use change.data as the row payload inside onChange().

Schema and topic route

sql snippetSQL
CREATE NAMESPACE IF NOT EXISTS blog; CREATE SHARED TABLE IF NOT EXISTS blog.blogs (    blog_id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),    content TEXT NOT NULL,    summary TEXT,    created TIMESTAMP NOT NULL DEFAULT NOW(),    updated TIMESTAMP NOT NULL DEFAULT NOW()); CREATE SHARED TABLE IF NOT EXISTS blog.summary_failures (    run_key TEXT PRIMARY KEY,    blog_id TEXT NOT NULL,    error TEXT NOT NULL,    created TIMESTAMP NOT NULL DEFAULT NOW(),    updated TIMESTAMP NOT NULL DEFAULT NOW()); CREATE SHARED TABLE IF NOT EXISTS blog.summary_chunks (  run_key TEXT NOT NULL,  blog_id TEXT NOT NULL,  chunk_index BIGINT NOT NULL,  chunk_text TEXT NOT NULL,  created TIMESTAMP NOT NULL DEFAULT NOW(),  updated TIMESTAMP NOT NULL DEFAULT NOW(),  PRIMARY KEY (run_key, chunk_index)); CREATE TOPIC blog.summarizer;ALTER TOPIC blog.summarizer ADD SOURCE blog.blogs ON INSERT WITH (payload = 'full');ALTER TOPIC blog.summarizer ADD SOURCE blog.blogs ON UPDATE WITH (payload = 'full');

Agent runtime wiring

typescript snippettypescript
await runConsumer<Record<string, unknown>>({  client,  name: 'summarizer-agent',  topic: 'blog.summarizer',  groupId: 'blog-summarizer-agent',  start: 'earliest',  batchSize: 20,  retry: {    maxAttempts: 3,    initialBackoffMs: 250,    maxBackoffMs: 1500,    multiplier: 2,  },  onChange,  onFailed,  ackOnFailed: true,});

onChange performs row-level summarization work through change.data. onFailed writes terminal failures to blog.summary_failures, so failed runs are inspectable and recoverable.

Current example code uses an UPDATE then INSERT fallback for failure persistence (instead of INSERT ... ON CONFLICT) to match current planner support.

Streaming chunks into a table (summarizer-agent.ts)

This variant stores each streamed chunk in blog.summary_chunks for observability/replay and then updates blog.blogs.summary with the joined result.

ts snippetTS
import { Auth } from '@kalamdb/client';import { createConsumerClient, runConsumer } from '@kalamdb/consumer';import { GoogleGenerativeAI } from '@google/generative-ai'; const client = createConsumerClient({  url: process.env.KALAMDB_URL ?? 'http://127.0.0.1:2900',  authProvider: async () => Auth.basic(    process.env.KALAMDB_USER ?? 'root',    process.env.KALAMDB_PASSWORD ?? 'kalamdb123',  ),}); const gemini = process.env.GEMINI_API_KEY  ? new GoogleGenerativeAI(process.env.GEMINI_API_KEY).getGenerativeModel({      model: process.env.GEMINI_MODEL ?? 'gemini-2.5-flash',    })  : null; async function* streamSummary(content: string): AsyncGenerator<{ index: number; text: string }> {  if (!gemini) {    const fallback = content.trim().slice(0, 220);    if (fallback.length > 0) {      yield { index: 0, text: `${fallback}.` };    }    return;  }   const response = await gemini.generateContentStream({    contents: [{ role: 'user', parts: [{ text: `Summarize in one sentence:\n\n${content}` }] }],  });   let index = 0;  for await (const chunk of response.stream) {    const text = chunk.text();    if (!text) continue;    yield { index, text };    index += 1;  }} await runConsumer<Record<string, unknown>>({  client,  name: 'summarizer-agent',  topic: process.env.KALAMDB_TOPIC ?? 'blog.summarizer',  groupId: process.env.KALAMDB_GROUP ?? 'blog-summarizer-agent',  start: 'earliest',  batchSize: 20,  retry: {    maxAttempts: 3,    initialBackoffMs: 250,    maxBackoffMs: 1500,    multiplier: 2,  },  onChange: async (ctx, change) => {    const row = change.data;    const blogId = row.blog_id;    if (typeof blogId !== 'string' && typeof blogId !== 'number') return;     const current = await ctx.queryOne(      'SELECT content FROM blog.blogs WHERE blog_id = $1',      [String(blogId)],    );    const content = current?.content.asString()?.trim();    if (!content) return;     await ctx.sql('DELETE FROM blog.summary_chunks WHERE run_key = $1', [ctx.runKey]);     const parts: string[] = [];    for await (const chunk of streamSummary(content)) {      parts.push(chunk.text);      await ctx.sql(        'INSERT INTO blog.summary_chunks (run_key, blog_id, chunk_index, chunk_text, created, updated) VALUES ($1, $2, $3, $4, NOW(), NOW())',        [ctx.runKey, String(blogId), String(chunk.index), chunk.text],      );    }     const summary = parts.join('').trim();    if (!summary) return;     await ctx.sql(      'UPDATE blog.blogs SET summary = $1, updated = NOW() WHERE blog_id = $2',      [summary, String(blogId)],    );  },  onFailed: async (ctx, change) => {    const blogId = change.data.blog_id;    const normalizedBlogId =      typeof blogId === 'string' || typeof blogId === 'number' ? String(blogId) : 'unknown';     const updated = await ctx.sql(      'UPDATE blog.summary_failures SET blog_id = $2, error = $3, updated = NOW() WHERE run_key = $1',      [ctx.runKey, normalizedBlogId, String(ctx.error ?? 'unknown')],    );     if ((updated.results?.[0]?.row_count ?? 0) === 0) {      await ctx.sql(        'INSERT INTO blog.summary_failures (run_key, blog_id, error, created, updated) VALUES ($1, $2, $3, NOW(), NOW())',        [ctx.runKey, normalizedBlogId, String(ctx.error ?? 'unknown')],      );    }  },  ackOnFailed: true,});

For background on topics, consumers, and runtime helpers, see:

How to run this example

  1. Start KalamDB:
bash snippetBASH
cd backendcargo run
  1. Bootstrap schema, topic routes, sample row, and .env.local:
bash snippetBASH
cd examples/summarizer-agent./setup.sh
  1. Install dependencies:
bash snippetBASH
npm install
  1. Optional: enable Gemini summaries (otherwise fallback summarizer is used):
dotenv snippetdotenv
GEMINI_API_KEY=your_key_hereGEMINI_MODEL=gemini-2.5-flash
  1. Start the agent:
bash snippetBASH
npm run start
  1. Trigger summarization by updating blog content:
sql snippetSQL
UPDATE blog.blogsSET content = 'KalamDB topics let tiny agents react to data changes and enrich rows in place.'WHERE blog_id = <sample_blog_id_from_setup_output>;
  1. Verify summary output:
sql snippetSQL
SELECT blog_id, content, summary, created, updatedFROM blog.blogsWHERE blog_id = <sample_blog_id_from_setup_output>;

You should see summary populated after the agent consumes the update event.

Why this example is useful

  • Demonstrates a complete event-driven KalamDB agent loop with minimal code.
  • Shows practical retry handling plus a persistent failure sink table.
  • Shows how to stream model output chunks into a table for auditability and partial-progress visibility.
  • Uses the same model for production-style background workers that enrich data in place.
Last updated on