Consumer Runtime
@kalamdb/consumer exposes a high-level worker runtime on top of topic consumers:
runConsumer()for row/change-driven handlers with retry/failure hooksrunAgent()as a deprecated compatibility alias for the same lifecycle modelcreateLangChainAdapter()for plugging in LangChain chat models
These APIs are exported directly from @kalamdb/consumer.
The client you pass to these helpers should be created with createConsumerClient() or otherwise satisfy the worker-side ConsumerClientLike contract.
runConsumer() quick start
Handler context (ConsumerRunContext)
onChange(ctx, change) receives:
- metadata:
name,topic,groupId,runKey,attempt,maxAttempts - source data:
change,message, producinguser, typedop,timestampMs,partitionId, andoffset - helpers:
sql(),queryOne(),queryAll(),ack() - LLM helper:
llm(whenllmadapter + optionalsystemPromptare configured)
Use change.data for the decoded changed row/event. The same object is available as ctx.change, but metadata lives on ctx, not on change. The older onRow(ctx, row) and ctx.row names are deprecated aliases.
runKey
Default format is:
<name>:<topic>:<partition_id>:<offset>
Override with runKeyFactory when integrating custom idempotency keys.
Retry and ACK behavior
runConsumer() always consumes with manual ack semantics (auto_ack: false) and handles acking explicitly:
onChangesucceeds: message is acked.onChangethrows: retried based onretrypolicy.- retries exhausted:
- if
onFailedis missing, message is not acked. - if
onFailedsucceeds andackOnFailed !== false, message is acked. - if
onFailedthrows, message is not acked.
- if
Hooks:
onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })onError({ error, runKey, message })
Retry policy options
The retry object supports additional tuning:
jitterRatio— add randomness to backoff (0 disables jitter)shouldRetry(error, attempt)— classify retryable failures (defaults to retrying)
Other useful runConsumer options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.
runConsumer() also supervises the underlying consumer loop. If the server is temporarily down or the connection drops, it retries with exponential backoff and jitter until stopSignal aborts. Configure this with connectionRetry.
Row parsing
By default, the runtime parses KalamDB topic rows like this:
- it reads
message.payload - if
message.payloadhas a nestedrowobject, it usespayload.rowfor backward compatibility - otherwise it uses
message.payloaddirectly
That means apps using the ORM generator can pass the generated row type directly and skip parser boilerplate:
You can override with changeParser(message):
Return null only when you intentionally want to skip agent handling for a message.
Message-Level Handling
Use onMessage only for older message-level code that wants to read directly from ctx.message:
Use this when you want the retry/failure lifecycle but do not need row parsing.
runConsumer() acks after successful handler completion. Call ctx.ack() only if you explicitly want to ack before returning.
LangChain integration
createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():
ctx.llm.complete() accepts either a plain string prompt or structured message input.
Production notes
- Prefer idempotent writes keyed by
runKey. - Keep
retry.maxAttemptsbounded and tune backoff for your latency profile. - Use
stopSignalfor graceful shutdown in containerized workers. - Leave the default connection retry enabled for long-running workers so transient restarts do not exit the process.
- Persist terminal failures (
onFailed) to a table for replay/inspection.