agentskit.js
ToolsIntegrations

postgres-cdc

Postgres change-data-capture — logical replication slots (pgoutput / wal2json) and Supabase realtime, exposed as tool primitives + a stream helper.

import { postgresCdc, createCdcStream } from '@agentskit/tools'

const runtime = createRuntime({
  adapter,
  tools: [
    ...postgresCdc({
      admin: { execute: (sql, params) => pool.query(sql, params).then(r => ({ rows: r.rows })) },
      slotName: 'agentskit_slot',
      publication: 'agentskit_pub',
      plugin: 'pgoutput',
    }),
  ],
})

#Sub-tools

NamePurpose
postgresCdcStatusSlot state, WAL lag, restart/confirmed LSN
postgresCdcCreateSlotCreate logical replication slot
postgresCdcDropSlotDrop slot (frees WAL retention)
postgresCdcAdvanceAdvance confirmed_flush_lsn past a checkpoint
postgresCdcPeekPeek N pending changes without consuming

Bundled: postgresCdc(config).

For the long-running stream of normalised CdcChangeEvents, use createCdcStream(config) — it yields an AsyncIterable and is consumed by AgentsKitOS triggers (trigger.cdc.* events, ADR-0005), not by the agent loop.

#Config

type PostgresCdcConfig = {
  admin?: CdcAdminClient                    // any parameterised SQL runner (pg, Neon, Supabase)
  stream?: CdcStreamClient                  // long-running consumer (pg-logical-replication / supabase-realtime)
  slotName: string
  publication?: string
  plugin?: 'pgoutput' | 'wal2json'
  maxPeek?: number                          // cap on peek (default 100)
}

type CdcChangeEvent = {
  op: 'insert' | 'update' | 'delete' | 'truncate' | 'schema'
  schema: string
  table: string
  lsn?: string
  commitTs?: string
  before?: Record<string, unknown>
  after?: Record<string, unknown>
}

pg-logical-replication, wal2json, and @supabase/realtime-js are not bundled — pass an adapter so this package stays driver-free.

#Example — slot bootstrap + stream

import { postgresCdc, createCdcStream } from '@agentskit/tools'

await runtime.run('Create the CDC slot if missing and report status.')

const stream = createCdcStream({
  stream: myLogicalReplicationAdapter,      // wraps pg-logical-replication
  slotName: 'agentskit_slot',
})

for await (const change of stream) {
  await runtime.run(`Row ${change.op} on ${change.schema}.${change.table}: ${JSON.stringify(change.after)}`)
}

#Operational notes

  • WAL retention: an unconsumed slot grows pg_wal indefinitely. Drop slots you no longer use.
  • Reconnect / backoff: handle in your adapter — emit op: 'schema' events on column add/drop so the agent can react.
  • Initial snapshot: use a COPY snapshot before streaming if the agent must see existing rows.
  • Supabase: use realtime-js and skip slot management; auth is RLS-scoped.

#Safety

  • CDC payloads can include sensitive columns. Filter with a column allowlist before feeding to the model.
  • Consider redaction for any sink that persists events.
  • postgres — query mode.
  • AgentsKitOS triggers — trigger.cdc.* event bus.
  • Issue #728, #812.

Explore nearby

✎ Edit this page on GitHub·Found a problem? Open an issue →·How to contribute →

On this page