Recipes
Trigger adapters (email / teams / postgres-cdc)
Reference adapter snippets that wrap heavy drivers (nodemailer, imapflow, botbuilder, pg-logical-replication) into the driver-free contracts the AgentsKitOS triggers package consumes.
The @agentskit/tools integrations stay driver-free by accepting small client adapters. Here are reference wrappers for the three integrations AgentsKitOS triggers most commonly consume.
#Email β nodemailer + imapflow
import nodemailer from 'nodemailer'
import { ImapFlow } from 'imapflow'
import { simpleParser } from 'mailparser'
import type { EmailTransport, ImapClient } from '@agentskit/tools'
export const emailTransport: EmailTransport = {
send: async msg => {
const transport = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: Number(process.env.SMTP_PORT ?? 587),
auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS },
})
const info = await transport.sendMail({
from: msg.from,
to: msg.to,
cc: msg.cc,
bcc: msg.bcc,
subject: msg.subject,
text: msg.text,
html: msg.html,
attachments: msg.attachments?.map(a => ({
filename: a.filename,
content: a.contentBase64 ? Buffer.from(a.contentBase64, 'base64') : a.content,
contentType: a.contentType,
})),
})
return { messageId: info.messageId, accepted: info.accepted as string[], rejected: info.rejected as string[] }
},
}
export const imapClient: ImapClient = {
fetch: async opts => {
const client = new ImapFlow({
host: process.env.IMAP_HOST!,
port: 993,
secure: true,
auth: { user: process.env.IMAP_USER!, pass: process.env.IMAP_PASS! },
})
await client.connect()
const lock = await client.getMailboxLock(opts.mailbox ?? 'INBOX')
try {
const search = { seen: opts.unseenOnly ? false : undefined, since: opts.since ? new Date(opts.since) : undefined, from: opts.from, subject: opts.subject }
const messages = []
for await (const msg of client.fetch(search, { source: true, envelope: true, uid: true })) {
const parsed = await simpleParser(msg.source!)
messages.push({
id: parsed.messageId ?? String(msg.uid),
uid: msg.uid,
from: parsed.from?.text ?? '',
to: (parsed.to?.value ?? []).map(a => a.address!).filter(Boolean),
subject: parsed.subject ?? '',
date: (parsed.date ?? new Date()).toISOString(),
text: parsed.text,
html: typeof parsed.html === 'string' ? parsed.html : undefined,
})
if (messages.length >= (opts.maxFetch ?? 50)) break
}
return messages
} finally {
lock.release()
await client.logout()
}
},
}#Microsoft Teams β botbuilder
import { BotFrameworkAdapter, TurnContext } from 'botbuilder'
import type { TeamsBotClient } from '@agentskit/tools'
const adapter = new BotFrameworkAdapter({
appId: process.env.MS_APP_ID!,
appPassword: process.env.MS_APP_PASSWORD!,
})
export const teamsBotClient: TeamsBotClient = {
send: async msg => {
const ref = {
conversation: { id: msg.conversationId },
serviceUrl: msg.serviceUrl ?? 'https://smba.trafficmanager.net/amer/',
channelId: 'msteams',
bot: { id: process.env.MS_APP_ID! },
}
let resourceId = ''
await adapter.continueConversation(ref as any, async (ctx: TurnContext) => {
const activity: any = { type: 'message', text: msg.text }
if (msg.card) activity.attachments = [msg.card]
if (msg.replyToId) activity.replyToId = msg.replyToId
const res = await ctx.sendActivity(activity)
resourceId = res?.id ?? ''
})
return { id: resourceId, conversationId: msg.conversationId }
},
}#Postgres CDC β pg-logical-replication
import { LogicalReplicationService, PgoutputPlugin } from 'pg-logical-replication'
import type { CdcAdminClient, CdcStreamClient, CdcChangeEvent } from '@agentskit/tools'
import { Pool } from 'pg'
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
export const cdcAdmin: CdcAdminClient = {
execute: async (sql, params) => {
const res = await pool.query(sql, params as unknown[])
return { rows: res.rows, rowCount: res.rowCount ?? undefined }
},
}
export function cdcStream(slot: string, publication: string): CdcStreamClient {
return {
stream: ({ signal, startLsn } = {}) => {
const service = new LogicalReplicationService({ connectionString: process.env.DATABASE_URL })
const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publication] })
async function* iterate(): AsyncIterable<CdcChangeEvent> {
const queue: CdcChangeEvent[] = []
let resolve: (() => void) | null = null
service.on('data', (lsn: string, log: any) => {
if (!log?.tag) return
const map: Record<string, CdcChangeEvent['op']> = { insert: 'insert', update: 'update', delete: 'delete', truncate: 'truncate', relation: 'schema' }
const op = map[log.tag]
if (!op) return
queue.push({ op, schema: log.schema ?? '', table: log.relation?.name ?? '', lsn, before: log.old, after: log.new })
resolve?.()
})
service.subscribe(plugin, slot, startLsn).catch(() => { /* surfaced via signal */ })
signal?.addEventListener('abort', () => service.stop())
while (!signal?.aborted) {
if (queue.length === 0) await new Promise<void>(r => { resolve = r })
while (queue.length) yield queue.shift()!
}
}
return iterate()
},
}
}#Wiring into AgentsKitOS triggers
import { createChatTrigger } from '@agentskit/triggers'
import { email } from '@agentskit/tools'
import { emailTransport, imapClient } from './email-adapter'
createChatTrigger({
source: 'imap',
client: imapClient,
poll: { intervalMs: 60_000, mailbox: 'INBOX', unseenOnly: true },
runtime,
outboundTools: [...email({ transport: emailTransport, defaultFrom: 'bot@example.com' })],
})#Related
- email Β· teams Β· postgres-cdc
- Issue #772.
Explore nearby
- PeerRecipes
Copy-paste solutions grouped by theme. Every recipe end-to-end, runs as written.
- PeerCustom adapter
Wrap any LLM API as an AgentsKit adapter. Plug-and-play with the rest of the kit in 30 lines.
- PeerAdapter contract tests
Verify any adapter against the ADR 0001 invariants A1βA10 with the shared test harness.