Skip to Content
Agent Worker Queue

Agent Worker Queue

Use this pattern for asynchronous processing tasks such as moderation, enrichment, classification, and indexing.

Core pattern

  • topic routes capture relevant source events
  • worker group consumes in batches
  • each worker acknowledges offsets after successful processing
  • retries/idempotency handled in application logic

Example schema and topic route

sql snippetSQL
CREATE TOPIC agent.tasks PARTITIONS 4; ALTER TOPIC agent.tasksADD SOURCE app.jobsON INSERTWITH (payload = 'full');

Example implementation

typescript snippettypescript
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({  url: 'http://localhost:2900',  authProvider: async () => Auth.basic('agent-worker', 'Secret123!'),}); const worker = client.consumer({  topic: 'agent.tasks',  group_id: 'agent-workers',  auto_ack: false,  batch_size: 20,  concurrency_per_partition: 2,}); await worker.run(async (ctx) => {  await processTask(ctx.message.value);  await ctx.ack();});

Code references

Last updated on