Skip to Content
ExamplesTopic Consumer

Topic Consumer

Shows how to build a background worker that consumes topic events with explicit offset commits.

Source: link/sdks/rust/examples/topic-consumer/

What it does

  1. Creates a table and a topic
  2. Routes table inserts to the topic (ALTER TOPIC ... ADD SOURCE)
  3. Inserts a row (which publishes to the topic)
  4. Polls the consumer group for records
  5. Marks records processed and commits offsets synchronously

Run it

BASH
cd link/sdks/rustexport KALAMDB_SERVER_URL=http://localhost:2900export KALAMDB_ROOT_PASSWORD=kalamdb123cargo run -p topic-consumer

Expected output:

TEXT
polled 1 record(s)offset=0 payload={"row":{...}}committed up to offset 0

Requires the consumer feature (enabled in the example crate).

Topic routing

SQL
CREATE TOPIC default.my_topic;ALTER TOPIC default.my_topic ADD SOURCE default.my_table ON INSERT WITH (payload = 'full');

Consumer pattern

RUST
use std::time::Duration;use kalam_client::AutoOffsetReset; let mut consumer = client    .consumer()    .group_id("my-worker")    .topic("default.my_topic")    .auto_offset_reset(AutoOffsetReset::Earliest)    .enable_auto_commit(false)    .poll_timeout(Duration::from_secs(5))    .build()?; let records = consumer.poll().await?;for record in &records {    // process record.payload    consumer.mark_processed(record);}consumer.commit_sync().await?;consumer.close().await?;

Install with the consumer feature:

BASH
cargo add kalam-client --features native-sdk,consumer

Next

Last updated on