Consumer Runtime
@kalamdb/consumer exposes a high-level worker runtime on top of topic consumers:
runConsumer()for row/change-driven handlers with retry/failure hooksrunAgent()as a deprecated compatibility alias for the same lifecycle modelcreateLangChainAdapter()for plugging in LangChain chat models
These APIs are exported directly from @kalamdb/consumer.
The client you pass to these helpers should be created with createConsumerClient() or otherwise satisfy the worker-side ConsumerClientLike contract.
runConsumer() quick start
Handler context (ConsumerRunContext)
onChange(ctx, change) receives:
- runtime metadata:
ctx.name,ctx.runKey,ctx.attempt, andctx.maxAttempts - source data:
change.datais the decoded row/event payload - change metadata:
change.user, typedchange.op,change.key,change.timestampMs,change.partitionId,change.offset,change.topic,change.groupId, and metadata-onlychange.message - helpers:
sql(),queryOne(),queryAll(),ack() - LLM helper:
llm(whenllmadapter + optionalsystemPromptare configured)
Use change.data for the decoded changed row/event and use the other change.* fields for metadata about that same event. High-level ctx is intentionally reserved for runtime execution state and helpers, so it does not expose message, change, user, op, or offset duplicates. The high-level change.message object intentionally omits payload, deprecated value, and raw transport change fields, so the decoded row lives in exactly one place: change.data. change.user is populated only when the topic event carries a subject and is expected to be undefined for shared-table routes such as the summarizer example. The older onRow(ctx, row) alias remains deprecated; use onChange(ctx, change) or onFailed(ctx, change) for the stable shape.
Generated ORM row types use the same interface. Pass the generated row type as the first runConsumer<T>() generic, read the row from change.data, and read event metadata from change.op, change.user, change.offset, or change.message. No ORM-specific wrapper is needed.
runKey
Default format is:
<name>:<topic>:<partition_id>:<offset>
Override with runKeyFactory when integrating custom idempotency keys.
Retry and ACK behavior
runConsumer() does not expose an auto_ack option. Internally it creates the underlying low-level consumer with auto_ack: false so the runtime can control retries and ack timing explicitly:
onChangesucceeds: message is acked.onChangethrows: retried based onretrypolicy.- retries exhausted:
- if
onFailedis missing, message is not acked. - if
onFailedsucceeds andackOnFailed !== false, message is acked. - if
onFailedthrows, message is not acked.
- if
If you need to choose auto_ack yourself, use client.consumer() directly instead of runConsumer().
Hooks:
onRetry({ error, attempt, maxAttempts, backoffMs, runKey, message })onError({ error, runKey, message })onConnect()onConnectionRetry({ error, attempt, maxAttempts, backoffMs })onConnectionRestored({ attempt })onConnectionError({ error, message, recoverable, attempt, backoffMs })
Retry policy options
The retry object supports additional tuning:
jitterRatio— add randomness to backoff (0 disables jitter)shouldRetry(error, attempt)— classify retryable failures (defaults to retrying)
Other useful runConsumer options include partitionId, timeoutSeconds, and stopSignal for graceful shutdown.
runConsumer() also supervises the underlying consumer loop. If the server is temporarily down or the connection drops, it retries with exponential backoff and jitter until stopSignal aborts. Configure this with connectionRetry:
Use the connection hooks when you want operators to see the worker lifecycle clearly in logs:
onConnect() fires once when runConsumer() reaches its first healthy poll, and then again only after a later outage has been recovered through the same runtime. onConnectionRestored() remains the explicit recovery-only hook.
onConnectionError() now mirrors the same connection classification used by createConsumerClient(): fatal configuration/auth/bootstrap failures are surfaced with recoverable: false, while transient reachability failures stay retryable.
onConnectionRestored() fires once after a retriable connection failure when the runtime successfully reaches KalamDB again. With the default createConsumerClient() implementation, this happens after the first successful poll, even if that poll returns no messages.
Row parsing
By default, the runtime parses KalamDB topic rows like this:
- it reads
message.payload - if
message.payloadhas a nestedrowobject, it usespayload.rowfor backward compatibility - otherwise it uses
message.payloaddirectly
That means apps using the ORM generator can pass the generated row type directly and skip parser boilerplate:
You can override with changeParser(message):
Return null only when you intentionally want to skip agent handling for a message.
Message-Level Handling
Use onMessage only for older single-argument handlers that still want the retry/failure lifecycle:
onMessage remains as a deprecated compatibility hook. New workers should use onChange(ctx, change) so change data and per-change metadata stay together.
runConsumer() acks after successful handler completion. Call ctx.ack() only if you explicitly want to ack before returning.
LangChain integration
llm and systemPrompt are still supported optional RunConsumerOptions fields. If you do not need model-assisted workers, leave them undefined and ctx.llm will be null.
createLangChainAdapter() accepts a duck-typed chat model with invoke() and optional stream():
ctx.llm.complete() accepts either a plain string prompt or structured message input.
Production notes
- Prefer idempotent writes keyed by
runKey. - Keep
retry.maxAttemptsbounded and tune backoff for your latency profile. - Use
stopSignalfor graceful shutdown in containerized workers. - Leave the default connection retry enabled for long-running workers so transient restarts do not exit the process.
- Persist terminal failures (
onFailed) to a table for replay/inspection.