postgres-cdc
Postgres change-data-capture — logical replication slots (pgoutput / wal2json) and Supabase realtime, exposed as tool primitives + a stream helper.
import { postgresCdc, createCdcStream } from '@agentskit/tools'
const runtime = createRuntime({
adapter,
tools: [
...postgresCdc({
admin: { execute: (sql, params) => pool.query(sql, params).then(r => ({ rows: r.rows })) },
slotName: 'agentskit_slot',
publication: 'agentskit_pub',
plugin: 'pgoutput',
}),
],
})#Sub-tools
| Name | Purpose |
|---|---|
postgresCdcStatus | Slot state, WAL lag, restart/confirmed LSN |
postgresCdcCreateSlot | Create logical replication slot |
postgresCdcDropSlot | Drop slot (frees WAL retention) |
postgresCdcAdvance | Advance confirmed_flush_lsn past a checkpoint |
postgresCdcPeek | Peek N pending changes without consuming |
Bundled: postgresCdc(config).
For the long-running stream of normalised CdcChangeEvents, use createCdcStream(config) — it yields an AsyncIterable and is consumed by AgentsKitOS triggers (trigger.cdc.* events, ADR-0005), not by the agent loop.
#Config
type PostgresCdcConfig = {
admin?: CdcAdminClient // any parameterised SQL runner (pg, Neon, Supabase)
stream?: CdcStreamClient // long-running consumer (pg-logical-replication / supabase-realtime)
slotName: string
publication?: string
plugin?: 'pgoutput' | 'wal2json'
maxPeek?: number // cap on peek (default 100)
}
type CdcChangeEvent = {
op: 'insert' | 'update' | 'delete' | 'truncate' | 'schema'
schema: string
table: string
lsn?: string
commitTs?: string
before?: Record<string, unknown>
after?: Record<string, unknown>
}pg-logical-replication, wal2json, and @supabase/realtime-js are not bundled — pass an adapter so this package stays driver-free.
#Example — slot bootstrap + stream
import { postgresCdc, createCdcStream } from '@agentskit/tools'
await runtime.run('Create the CDC slot if missing and report status.')
const stream = createCdcStream({
stream: myLogicalReplicationAdapter, // wraps pg-logical-replication
slotName: 'agentskit_slot',
})
for await (const change of stream) {
await runtime.run(`Row ${change.op} on ${change.schema}.${change.table}: ${JSON.stringify(change.after)}`)
}#Operational notes
- WAL retention: an unconsumed slot grows pg_wal indefinitely. Drop slots you no longer use.
- Reconnect / backoff: handle in your adapter — emit
op: 'schema'events on column add/drop so the agent can react. - Initial snapshot: use a
COPYsnapshot before streaming if the agent must see existing rows. - Supabase: use
realtime-jsand skip slot management; auth is RLS-scoped.
#Safety
- CDC payloads can include sensitive columns. Filter with a column allowlist before feeding to the model.
- Consider redaction for any sink that persists events.
#Related
Explore nearby
- PeerIntegrations
20+ ready-made connectors for the services agents actually need. Each follows the same contract — install, config, execute — and ships granular sub-tools alongside a bundled set.
- Peergithub
GitHub REST v3 — search issues, create issues, comment. Pairs with HITL for ship-gating bots.
- PeergithubActions
GitHub Actions — list runs and trigger workflow_dispatch events.