Realtime Subscriptions
kalam-link supports realtime subscriptions via WebSocket.
Table subscription (sugar API)
const unsub = await client.subscribe('app.messages', (event) => {
console.log(event.type);
});In source, this is sugar for:
client.subscribeWithSql(`SELECT * FROM ${tableName}`, callback, options)SQL subscription
import { ChangeType, MessageType } from 'kalam-link';
const unsub = await client.subscribeWithSql(
'SELECT * FROM app.messages WHERE conversation_id = 42',
(event) => {
if (event.type === MessageType.Change) {
if (event.change_type !== ChangeType.Insert) return;
console.log(event.change_type, event.rows);
}
},
{ batch_size: 100, last_rows: 50 }
);Event model
Use runtime enums from types.ts:
import { MessageType, ChangeType } from 'kalam-link';Event sequence commonly observed:
subscription_ack- one or more
initial_data_batch changeevents (insert|update|delete)- optional
error
Subscription options
{
batch_size?: number;
last_rows?: number;
from_seq_id?: number;
}Use from_seq_id for resume patterns after reconnect. getLastSeqId() returns a string, so cast explicitly when restoring:
const seq = client.getLastSeqId(subscriptionId);
const options = seq ? { from_seq_id: Number(seq) } : undefined;SQL safety for dynamic filters
subscribeWithSql() accepts a SQL string. For dynamic values:
- Prefer validated numeric IDs before interpolation.
- Do not interpolate raw user text directly into SQL.
function buildConversationSql(conversationId: number): string {
if (!Number.isInteger(conversationId) || conversationId <= 0) {
throw new Error('conversationId must be a positive integer');
}
return `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId}`;
}Unsubscribe patterns
const unsub = await client.subscribe('app.messages', cb);
await unsub();
const subs = client.getSubscriptions();
if (subs[0]) {
await client.unsubscribe(subs[0].id);
}
await client.unsubscribeAll();Subscription introspection
client.getSubscriptionCount();
client.getSubscriptions();
client.isSubscribedTo('SELECT * FROM app.messages');
client.getLastSeqId('<subscription-id>');getLastSeqId() is useful for checkpointing consumption progress.
Error handling guidance
- Parse
event.type === MessageType.Errorand inspectcode/message. - Keep callbacks resilient (non-throwing) to avoid app-level disruption.
- For mission-critical streams, persist last sequence IDs and re-subscribe with
from_seq_id.
Last updated on