agentskit.js
Recipes

Trigger adapters (email / teams / postgres-cdc)

Reference adapter snippets that wrap heavy drivers (nodemailer, imapflow, botbuilder, pg-logical-replication) into the driver-free contracts the AgentsKitOS triggers package consumes.

The @agentskit/tools integrations stay driver-free by accepting small client adapters. Here are reference wrappers for the three integrations AgentsKitOS triggers most commonly consume.

#Email β€” nodemailer + imapflow

import nodemailer from 'nodemailer'
import { ImapFlow } from 'imapflow'
import { simpleParser } from 'mailparser'
import type { EmailTransport, ImapClient } from '@agentskit/tools'

export const emailTransport: EmailTransport = {
  send: async msg => {
    const transport = nodemailer.createTransport({
      host: process.env.SMTP_HOST,
      port: Number(process.env.SMTP_PORT ?? 587),
      auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS },
    })
    const info = await transport.sendMail({
      from: msg.from,
      to: msg.to,
      cc: msg.cc,
      bcc: msg.bcc,
      subject: msg.subject,
      text: msg.text,
      html: msg.html,
      attachments: msg.attachments?.map(a => ({
        filename: a.filename,
        content: a.contentBase64 ? Buffer.from(a.contentBase64, 'base64') : a.content,
        contentType: a.contentType,
      })),
    })
    return { messageId: info.messageId, accepted: info.accepted as string[], rejected: info.rejected as string[] }
  },
}

export const imapClient: ImapClient = {
  fetch: async opts => {
    const client = new ImapFlow({
      host: process.env.IMAP_HOST!,
      port: 993,
      secure: true,
      auth: { user: process.env.IMAP_USER!, pass: process.env.IMAP_PASS! },
    })
    await client.connect()
    const lock = await client.getMailboxLock(opts.mailbox ?? 'INBOX')
    try {
      const search = { seen: opts.unseenOnly ? false : undefined, since: opts.since ? new Date(opts.since) : undefined, from: opts.from, subject: opts.subject }
      const messages = []
      for await (const msg of client.fetch(search, { source: true, envelope: true, uid: true })) {
        const parsed = await simpleParser(msg.source!)
        messages.push({
          id: parsed.messageId ?? String(msg.uid),
          uid: msg.uid,
          from: parsed.from?.text ?? '',
          to: (parsed.to?.value ?? []).map(a => a.address!).filter(Boolean),
          subject: parsed.subject ?? '',
          date: (parsed.date ?? new Date()).toISOString(),
          text: parsed.text,
          html: typeof parsed.html === 'string' ? parsed.html : undefined,
        })
        if (messages.length >= (opts.maxFetch ?? 50)) break
      }
      return messages
    } finally {
      lock.release()
      await client.logout()
    }
  },
}

#Microsoft Teams β€” botbuilder

import { BotFrameworkAdapter, TurnContext } from 'botbuilder'
import type { TeamsBotClient } from '@agentskit/tools'

const adapter = new BotFrameworkAdapter({
  appId: process.env.MS_APP_ID!,
  appPassword: process.env.MS_APP_PASSWORD!,
})

export const teamsBotClient: TeamsBotClient = {
  send: async msg => {
    const ref = {
      conversation: { id: msg.conversationId },
      serviceUrl: msg.serviceUrl ?? 'https://smba.trafficmanager.net/amer/',
      channelId: 'msteams',
      bot: { id: process.env.MS_APP_ID! },
    }

    let resourceId = ''
    await adapter.continueConversation(ref as any, async (ctx: TurnContext) => {
      const activity: any = { type: 'message', text: msg.text }
      if (msg.card) activity.attachments = [msg.card]
      if (msg.replyToId) activity.replyToId = msg.replyToId
      const res = await ctx.sendActivity(activity)
      resourceId = res?.id ?? ''
    })

    return { id: resourceId, conversationId: msg.conversationId }
  },
}

#Postgres CDC β€” pg-logical-replication

import { LogicalReplicationService, PgoutputPlugin } from 'pg-logical-replication'
import type { CdcAdminClient, CdcStreamClient, CdcChangeEvent } from '@agentskit/tools'
import { Pool } from 'pg'

const pool = new Pool({ connectionString: process.env.DATABASE_URL })

export const cdcAdmin: CdcAdminClient = {
  execute: async (sql, params) => {
    const res = await pool.query(sql, params as unknown[])
    return { rows: res.rows, rowCount: res.rowCount ?? undefined }
  },
}

export function cdcStream(slot: string, publication: string): CdcStreamClient {
  return {
    stream: ({ signal, startLsn } = {}) => {
      const service = new LogicalReplicationService({ connectionString: process.env.DATABASE_URL })
      const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames: [publication] })

      async function* iterate(): AsyncIterable<CdcChangeEvent> {
        const queue: CdcChangeEvent[] = []
        let resolve: (() => void) | null = null

        service.on('data', (lsn: string, log: any) => {
          if (!log?.tag) return
          const map: Record<string, CdcChangeEvent['op']> = { insert: 'insert', update: 'update', delete: 'delete', truncate: 'truncate', relation: 'schema' }
          const op = map[log.tag]
          if (!op) return
          queue.push({ op, schema: log.schema ?? '', table: log.relation?.name ?? '', lsn, before: log.old, after: log.new })
          resolve?.()
        })

        service.subscribe(plugin, slot, startLsn).catch(() => { /* surfaced via signal */ })
        signal?.addEventListener('abort', () => service.stop())

        while (!signal?.aborted) {
          if (queue.length === 0) await new Promise<void>(r => { resolve = r })
          while (queue.length) yield queue.shift()!
        }
      }

      return iterate()
    },
  }
}

#Wiring into AgentsKitOS triggers

import { createChatTrigger } from '@agentskit/triggers'
import { email } from '@agentskit/tools'
import { emailTransport, imapClient } from './email-adapter'

createChatTrigger({
  source: 'imap',
  client: imapClient,
  poll: { intervalMs: 60_000, mailbox: 'INBOX', unseenOnly: true },
  runtime,
  outboundTools: [...email({ transport: emailTransport, defaultFrom: 'bot@example.com' })],
})

Explore nearby

✎ Edit this page on GitHubΒ·Found a problem? Open an issue β†’Β·How to contribute β†’

On this page