Agent Worker Queue
Use this pattern for asynchronous processing tasks such as moderation, enrichment, classification, and indexing.
Core pattern
- topic routes capture relevant source events
- worker group consumes in batches
- each worker acknowledges offsets after successful processing
- retries/idempotency handled in application logic
Example schema and topic route
CREATE TOPIC agent.tasks PARTITIONS 4; ALTER TOPIC agent.tasksADD SOURCE app.jobsON INSERTWITH (payload = 'full');Example implementation
import { Auth } from '@kalamdb/client';import { createConsumerClient } from '@kalamdb/consumer'; const client = createConsumerClient({ url: 'http://localhost:2900', authProvider: async () => Auth.basic('agent-worker', 'Secret123!'),}); const worker = client.consumer({ topic: 'agent.tasks', group_id: 'agent-workers', auto_ack: false, batch_size: 20, concurrency_per_partition: 2,}); await worker.run(async (ctx) => { await processTask(ctx.message.value); await ctx.ack();});Code references
- Starter worker scaffold: https://github.com/kalamdb/KalamDB/tree/main/examples/simple-typescript
- Topic API contracts: API Reference
Last updated on