Skip to main content

Writing Integrations

Introduction

What is an Integration?

An integration is a plugin that connects to an external service (Gmail, Slack, Google Drive, etc.) and fetches content into your system. Each integration is a self-contained module that knows how to authenticate with the service and retrieve data.

How Integrations Work

Integrations use a task-based architecture where work is broken into small, independent tasks:

  1. Initial Task - The polling system creates an initial task (e.g., "get email list")
  2. Task Execution - The integration runs the task and returns two things:
    • output - Data to save to the database
    • tasks - New tasks to run (for pagination, downloading files, etc.)
  3. Recursive Processing - New tasks are queued and processed, spawning more tasks until done

Example: Gmail Flow

1. Task: start (priority 1)
→ Delegates to: getEmailList

2. Task: getEmailList
→ Returns: 50 email IDs + "next page" task
→ Creates: 50 x downloadEmail + 1 x getEmailList (next page)

3. Tasks: 50 x downloadEmail (priority 2) + 1 x getEmailList (priority 2)
→ Each downloadEmail returns: email content
→ getEmailList returns: 50 more email IDs + "next page" task

4. Process continues until no more pages

Getting Started

Integration File Structure

backend/src/integration/gmail/
├── src/
│ └── main.ts # Integration code
├── manifest.json # Integration metadata
└── package.json # Dependencies (tracked but not used yet)

Step 1: Define Your Types

Every integration needs these types:

IMPORTANT: All integrations must support a 'start' task type. This is used by the polling system to initiate fresh syncs every minute with high priority tasks.

// Task types - what operations can your integration perform?
// IMPORTANT: Always include 'start' as the first task type
export type GmailTaskType = 'start' | 'getEmailList' | 'downloadEmail';

// Task payload - data needed to run a task
export class GmailTaskPayload {
emailId?: string | null; // For downloadEmail task
pageToken?: string | null; // For pagination

constructor(config: GmailTaskPayload) {
this.emailId = config.emailId;
this.pageToken = config.pageToken;
}
}

// Task - combines type + payload
export class GmailTask extends BaseTask {
type: GmailTaskType;
payload: GmailTaskPayload;

constructor(type: GmailTaskType, payload: GmailTaskPayload) {
super({ type, payload });
this.type = type;
this.payload = payload;
}
}

// Task output - data to save to database
export class GmailTaskOutput extends BaseTaskOutput {}

// Task result - what runTask() returns
export class GmailTaskResult extends BaseTaskResult {
output: GmailTaskOutput[];
tasks?: GmailTask[];

constructor(config?: GmailTaskResult) {
super();
this.output = config?.output || [];
this.tasks = config?.tasks || [];
}
}

// Credentials - authentication data
export class GmailAuthCredentials extends BaseAuthCredentials {
clientId: string;
clientSecret: string;
accessToken: string;
refreshToken: string;

constructor(credentials: GmailAuthCredentials) {
super();
const { clientId, clientSecret, accessToken, refreshToken } = credentials;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.accessToken = accessToken;
this.refreshToken = refreshToken;
}
}

Step 2: Create Your Integration Class

import { google, gmail_v1 } from 'googleapis';
import {
BaseIntegration,
BaseTask,
BaseTaskResult,
} from '../../_base/main';

export class GmailIntegration extends BaseIntegration {
private oAuth2Client: any;
private gmailClient: gmail_v1.Gmail;

constructor(credentials: GmailAuthCredentials) {
super(credentials);

// Set up your API client
this.oAuth2Client = new google.auth.OAuth2(
this.getCredential('clientId') as string,
this.getCredential('clientSecret') as string,
);

this.gmailClient = google.gmail({
version: 'v1',
auth: this.oAuth2Client,
});
}

// Initialize connection (called once per task)
public async initialize(): Promise<void> {
const credentials: any = {
refresh_token: this.getCredential('refreshToken') as string,
};
this.oAuth2Client.setCredentials(credentials);
}

// Validate credentials
public async validateAuthentication(): Promise<boolean> {
try {
await this.initialize();
await this.gmailClient.users.labels.list({ userId: 'me' });
return true;
} catch (err) {
return false;
}
}

// Main task runner - routes to appropriate handler
public async runTask(task: GmailTask): Promise<GmailTaskResult> {
if (task.type === 'start') {
// Handle the "start" task - this is called every minute by the polling system
// For Gmail, we delegate to getEmailList with no pageToken to start from most recent
return await this.handleGetEmailList(
new GmailTask('getEmailList', new GmailTaskPayload({ pageToken: null }))
);
} else if (task.type === 'getEmailList') {
return await this.handleGetEmailList(task);
} else if (task.type === 'downloadEmail') {
return await this.handleDownloadEmail(task);
}

// Unknown task type
return new GmailTaskResult({ output: [], tasks: [] });
}

// Clean up (called after task completes)
public async shutdown(): Promise<void> {
if (this.oAuth2Client) {
this.oAuth2Client.setCredentials({});
}
}
}

Step 3: Implement Task Handlers

Task Handler Pattern:

  • Each task type gets its own handler method
  • Handlers return output (data to save) and tasks (work to do)
// Handler for listing emails
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
const output: GmailOutput[] = [];
const tasks: GmailTask[] = [];
const { pageToken } = task.payload;

try {
// Call the API
const response = await this.gmailClient.users.messages.list({
userId: 'me',
maxResults: 50,
pageToken: pageToken || undefined,
});

const messages = response.data.messages || [];

// Create a download task for each email
for (const message of messages) {
tasks.push(
new GmailTask('downloadEmail', new GmailTaskPayload({
emailId: message.id,
pageToken: null,
}))
);
}

// If there are more pages, create a task to fetch the next page
if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
emailId: null,
pageToken: response.data.nextPageToken,
}))
);
}
} catch (error) {
console.error('Error fetching email list:', error);
}

return new GmailTaskResult({ output, tasks });
}

// Handler for downloading a single email
private async handleDownloadEmail(task: GmailTask): Promise<GmailTaskResult> {
const output: GmailOutput[] = [];
const tasks: GmailTask[] = [];
const { emailId } = task.payload;

try {
// Fetch the full email
const response = await this.gmailClient.users.messages.get({
userId: 'me',
id: emailId!,
format: 'full',
});

// Extract relevant data
const subject = this.extractSubject(response.data);
const from = this.extractFrom(response.data);
// ... extract other fields ...

// Create output in the required format
output.push({
code: 200,
id: emailId!,
timestamp: new Date(parseInt(response.data.internalDate)),
content: {
subject: subject || '(No Subject)',
from,
to,
cc,
body: this.extractBody(response.data),
threadId: response.data.threadId || '',
labelIds: response.data.labelIds || [],
hasAttachments: attachments.length > 0,
attachmentCount: attachments.length,
},
});
} catch (error) {
console.error(`Error downloading email ${emailId}:`, error);
// Return error output
output.push({
code: 500,
id: emailId!,
timestamp: null,
content: { subject: 'Error', body: 'Failed to download email' },
});
}

return new GmailTaskResult({ output, tasks });
}

Step 4: Register Your Integration

At the end of your main.ts file:

export const integration = () => {
return {
manifest: require('../manifest.json'),
Integration: GmailIntegration,
};
};

Step 5: Create manifest.json

{
"domain": "gmail",
"name": "Gmail",
"version": "1.0.0",
"description": "Gmail email integration",
"author": "Your Name",
"controlFlow": {
// TBD: this is for a future UI layer to setup your integration
}
}

Core Concepts

Output Format

When your integration returns output, it must follow this structure:

{
code: 200, // HTTP-style status (200 = success, 500 = error)
id: 'msg-123', // External ID - used for idempotency (prevents duplicates)
content: { ... }, // Integration-specific JSON structure
timestamp: Date, // External timestamp - when this item was created in the source system
}

Important Fields:

  • code - HTTP-style status code (200 = success, 500 = error)
  • id - Must be the unique ID from the external system. This prevents the same item from being downloaded twice.
  • timestamp - Should be when the item was created in the external system (not when you downloaded it).
  • content - Your integration defines the structure. This is where all your integration-specific data goes.

Example for Gmail:

{
code: 200,
id: 'msg-abc123',
timestamp: new Date('2025-11-09T10:30:00Z'),
content: {
subject: 'Meeting Tomorrow',
from: 'alice@example.com',
to: 'bob@example.com',
cc: 'charlie@example.com',
body: 'Let\'s meet at 2pm...',
threadId: 'thread-xyz',
labelIds: ['INBOX', 'IMPORTANT'],
hasAttachments: true,
attachmentCount: 2,
}
}

Database Storage

The polling system automatically saves your output to the user_integration_content table:

{
id: 'uuid', // Auto-generated
userId: 'user-123', // Which user owns this data
userIntegrationId: 'int-456', // Which user-integration fetched this
integrationDomain: 'gmail', // Your integration domain
externalId: 'msg-123', // The 'id' from your output (usually the id of the data on the external server)
externalTimestamp: Date, // The 'timestamp' from your output (usually the data the data was created on the external server)
content: { ... }, // The 'content' from your output (full JSON object)
contentSize: 1234, // Auto-calculated size in bytes
createdAt: Date, // When saved to our database
updatedAt: Date // When last updated (this should rarely change but might be useful in some situations)
}

Integration Lifecycle

Here's what happens when your integration runs:

1. Polling system creates job in queue
└─> Job contains: credentials, userIntegrationId, task

2. Worker picks up job
└─> Creates instance: new GmailIntegration(credentials, contentExists)

3. Worker calls initialize()
└─> Your code: Set up API client, authenticate

4. Worker calls runTask(task)
└─> Your code: Execute task, return { output, tasks }
└─> Your code: Returns follow up tasks as part of the return value

5. Worker saves output to database
└─> Automatic based on your output format

6. Worker enqueues follow-up tasks
└─> Your tasks array becomes new jobs in the queue

7. Worker calls shutdown()
└─> Your code: Clean up connections

8. Repeat steps 2-7 for each task until queue is empty

Preventing Duplicates (Idempotency)

The system provides a contentExists callback function to check which items already exist in the database:

// In your integration constructor
constructor(credentials: any, contentExists?: ContentExistenceChecker) {
super(credentials, contentExists);
}

// In your task handler
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
// ... fetch messages from API ...

// Get all message IDs
const messageIds = messages.map(m => m.id);

// Check which ones we already have
const existingIds = await this.contentExists(messageIds);

// Only create tasks for messages we don't have yet
const newMessages = messages.filter(m => !existingIds.has(m.id));

for (const message of newMessages) {
tasks.push(
new GmailTask('downloadEmail', new GmailTaskPayload({
emailId: message.id,
pageToken: null,
}))
);
}

return new GmailTaskResult({ output, tasks });
}

This prevents re-downloading content that already exists in your database.

Always use this pattern to avoid wasting API quota, database storage, and to avoid duplicate data.

Common Patterns

Pattern 1: List → Download

Most integrations follow this pattern:

  1. List task - Fetch IDs/metadata of items
  2. Download tasks - Fetch full content for each item
// Gmail: getEmailList → downloadEmail
// Google Drive: listFiles → downloadFile
// Slack: getChannels → getMessages

Pattern 2: Pagination

Handle pagination by creating a new list task with a page token:

if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
emailId: null,
pageToken: response.data.nextPageToken,
}))
);
}

Pattern 3: Recursive Directory Traversal

For file systems, spawn tasks for subdirectories:

// FTP example
if (item.type === 'directory') {
tasks.push(
new FtpTask('listDirectory', new FtpTaskPayload({
path: item.path,
}))
);
}

Advanced Features

Priority Queue System

The polling system uses job priorities to ensure new emails/content are fetched quickly, even during long-running initial syncs.

Priority Levels (lower number = higher priority):

The system uses gradual priority degradation - each generation of tasks gets slightly lower priority:

  • Priority 1 - 'start' tasks from cron (highest)
  • Priority 2 - First generation (email list page 1, fresh downloads)
  • Priority 3 - Second generation (page 2, or downloads from page 1)
  • Priority 4 - Third generation
  • Priority 5+ - Deeper generations
  • Priority 10 - Maximum (deep backfill tasks, capped)

Why this matters:

Imagine your initial Gmail sync is downloading 10,000 old emails:

  • Without priorities: New emails that arrive won't be fetched until all 10,000 old emails are processed (could take hours)
  • With priorities:
    1. Every minute, a fresh 'start' task (priority 1) jumps to the front
    2. It discovers 5 new emails and creates download tasks for them (priority 2)
    3. Those 5 new downloads jump ahead of the 9,995 remaining old downloads (priority 5+)
    4. New emails are downloaded within ~1-2 minutes

Your integration's responsibility:

When handling the 'start' task, your integration should:

  1. Start from the most recent content (e.g., most recent emails, newest files)
  2. Use the contentExists callback to skip content that's already downloaded
  3. Stop early when you hit content that already exists (see "Early Stop Optimization" below)

Early Stop Optimization

To prevent subsequent syncs from paginating through thousands of old items, implement an early-stop mechanism:

// Add to your task payload
export class GmailTaskPayload {
emailId?: string | null;
pageToken?: string | null;
consecutiveEmptyPages?: number; // NEW: Track empty pages
}

// In your list handler
private async handleGetEmailList(task: GmailTask): Promise<GmailTaskResult> {
const { pageToken, consecutiveEmptyPages = 0 } = task.payload;

// Stop early if we've seen 3 consecutive pages with no new content
const MAX_EMPTY_PAGES = 3;
if (consecutiveEmptyPages >= MAX_EMPTY_PAGES) {
console.log('Stopping pagination - no new content');
return new GmailTaskResult({ output: [], tasks: [] });
}

// ... fetch messages ...

// Check what's new
const existingIds = await this.contentExists(messageIds);
const newMessages = messages.filter(m => !existingIds.has(m.id));

// Update counter
const nextEmptyPageCount = newMessages.length > 0 ? 0 : consecutiveEmptyPages + 1;

// Continue to next page with updated counter
if (response.data.nextPageToken) {
tasks.push(
new GmailTask('getEmailList', new GmailTaskPayload({
pageToken: response.data.nextPageToken,
consecutiveEmptyPages: nextEmptyPageCount,
}))
);
}

return new GmailTaskResult({ output, tasks });
}

This way, subsequent syncs stop after checking 3 pages with no new content instead of paginating through the entire history.

Handling Deletions and Updates

Many integrations need to detect when content has been deleted or modified in the external system (Gmail, Slack, etc.).

The Challenge

When a user deletes an email in Gmail, archives a Slack message, or removes content in any external system, our database still contains a copy of that content. We need a way to detect these changes and update our records accordingly.

Problems to solve:

  • Content can be deleted while our system is offline
  • Full mailbox scans are inefficient for large datasets
  • We need to detect changes without excessive API calls
  • Race conditions can occur during concurrent syncs

Solution: History/Delta API Pattern

Most modern APIs provide a "history" or "delta" endpoint that returns only changes since a specific point in time.

How it works:

  1. Initial Sync: When first syncing content, the API returns a historyId or deltaToken
  2. Store Token: Save this token in user_integration.syncState.lastHistoryId
  3. Periodic Checks: Regularly call the history endpoint with your stored token
  4. Process Changes: API returns only items that were added, deleted, or modified since that token
  5. Update Token: Store the new historyId for the next check

Implementation Steps

1. Add History Task Type

export type GmailTaskType =
| 'start' // Initial sync
| 'getEmailList' // Paginate through emails
| 'downloadEmail' // Download individual email
| 'checkHistory'; // NEW: Check for changes/deletions

export class GmailTaskPayload {
emailId?: string | null;
pageToken?: string | null;
startHistoryId?: string | null; // NEW: For history checks
}

2. Implement History Handler

private async handleCheckHistory(task: GmailTask): Promise<GmailTaskResult> {
const { startHistoryId } = task.payload;

// Call the external API's history endpoint
const response = await this.gmailClient.users.history.list({
userId: 'me',
startHistoryId: startHistoryId,
historyTypes: ['messageAdded', 'messageDeleted', 'labelAdded', 'labelRemoved']
});

const output: GmailTaskOutput[] = [];
const tasks: GmailTask[] = [];

// Process deletions
for (const history of response.data.history || []) {
if (history.messagesDeleted) {
for (const deleted of history.messagesDeleted) {
// Mark as deleted with special action
output.push(new GmailTaskOutput({
code: 200,
id: deleted.message.id,
content: { action: 'delete' }, // Special marker for deletions
timestamp: new Date(),
}));
}
}

// Process additions (new emails since last check)
if (history.messagesAdded) {
for (const added of history.messagesAdded) {
// Create download task for new message
tasks.push(new GmailTask('downloadEmail',
new GmailTaskPayload({ emailId: added.message.id })
));
}
}
}

// Return updated history token
return new GmailTaskResult({
output,
tasks,
syncState: {
lastHistoryId: response.data.historyId // Store for next check
}
});
}

3. Update Task Result Type

export class GmailTaskResult extends BaseTaskResult {
output: GmailTaskOutput[];
tasks?: GmailTask[];
syncState?: {
lastHistoryId?: string; // NEW: Updated history token
};
}

4. Polling System Integration

The polling processor should detect syncState and update the database:

// In polling.processor.ts (example - not implemented yet)
if (result.syncState?.lastHistoryId) {
await this.userIntegrationService.updateOne({
id: userIntegrationId,
syncState: {
...currentSyncState,
lastHistoryId: result.syncState.lastHistoryId
}
});
}

The polling service should handle deletion markers:

// In polling.service.ts - saveOutput() (example - not implemented yet)
for (const output of successfulOutputs) {
if (output.content?.action === 'delete') {
await this.contentService.markAsDeleted(userIntegrationId, output.id);
} else {
await this.contentService.createOne({...});
}
}

Database Schema for Deletions

Add deletion tracking fields to user_integration_content:

ALTER TABLE user_integration_content
ADD COLUMN deleted_at TIMESTAMP NULL,
ADD COLUMN is_deleted BOOLEAN DEFAULT FALSE;

CREATE INDEX idx_user_integration_content_is_deleted
ON user_integration_content(is_deleted, user_integration_id);

Scheduling Strategy

History checks should run more frequently than full syncs:

Full Sync (Initial/Backfill):

  • Run every 15-60 minutes
  • Fetches new content only
  • Low priority for deep backfill tasks

History Check (Changes/Deletes):

  • Run every 1-5 minutes
  • Detects deletions and recent additions
  • High priority for real-time updates

API-Specific Examples

Gmail History API

  • Endpoint: GET /gmail/v1/users/me/history?startHistoryId={id}
  • Token: syncState.lastHistoryId
  • Changes: messagesAdded, messagesDeleted, labelsAdded, labelsRemoved
  • Expiration: ~1 week - must handle 404 errors and fall back to full sync

Slack Conversations API

  • Endpoint: Cursor-based pagination with timestamps
  • Token: syncState.lastMessageTs
  • Changes: Compare timestamps, check for is_deleted: true flag

Microsoft Graph API

  • Endpoint: GET /me/messages/delta?$deltatoken={token}
  • Token: syncState.deltaToken
  • Changes: Returns @removed annotation for deleted items

Fallback Strategy

If history is unavailable (token expired, API doesn't support it):

  1. Weekly Full Comparison: Fetch all external IDs from the API
  2. Compare with Database: Find content in DB that's not in API response
  3. Mark as Deleted: Soft-delete missing items
  4. Update History Token: Get new token for future incremental syncs

Best Practices

DO:

  • Use soft deletes (deleted_at, is_deleted) rather than hard deletes
  • Store history tokens in syncState for persistence
  • Handle token expiration gracefully (fall back to full sync)
  • Run history checks frequently (1-5 minutes)
  • Use high priority for history check tasks
  • Filter deleted content from user queries by default

DON'T:

  • Hard delete content (you lose audit trail)
  • Ignore token expiration (can miss deletions)
  • Check history too infrequently (defeats the purpose)
  • Use same frequency as full sync (inefficient)
  • Forget to handle race conditions (multiple workers)

Reference

Complete Integration Checklist

  • Define task types for your integration (must include 'start' as first type)
  • Handle the 'start' task in runTask() method
  • Create credential class extending BaseAuthCredentials
  • Implement initialize() for authentication
  • Implement validateAuthentication()
  • Implement runTask() with routing to handlers
  • Create handler for each task type
  • Return output in correct format (code, id, content, timestamp)
  • Define your content structure (what fields go in the content object)
  • Use contentExists to prevent duplicates
  • Handle pagination by creating new tasks
  • Implement early stop optimization (track consecutive empty pages)
  • Create manifest.json with all task types listed
  • Register integration with integration() export
  • (Optional) Implement history/delta checking for deletions

Complete Gmail Example

See src/integration/gmail/src/main.ts for a complete, working example that demonstrates:

  • OAuth2 authentication
  • Pagination (email list with pageToken)
  • Idempotency (checking for existing emails)
  • Early stop optimization (consecutive empty pages)
  • Structured output format
  • Task spawning pattern
  • Error handling
  • Priority queue integration

Next Steps

After creating your integration:

  1. Add to Integration Loader - The system auto-discovers integrations in src/integration/*/src/main.ts
  2. Create User Integration - Use the CLI to link a user to your integration
  3. Monitor - Watch logs and database to verify data is being fetched
  4. Optimize - Add rate limiting, better error handling, retry logic as needed

Troubleshooting

Q: My integration isn't being picked up

  • Check the manifest.json exists
  • Verify the integration() function is exported
  • Restart the backend to reload integrations

Q: Tasks aren't spawning

  • Ensure you're returning tasks in the result
  • Check task types match your defined types
  • Verify payload structure is correct

Q: Content isn't saving

  • Check output has code: 200
  • Verify id field exists (must be the external system's unique ID)
  • Make sure content is a valid object
  • Ensure timestamp is a valid Date object

Q: Duplicates are being created

  • Use the contentExists callback to check existing items
  • Ensure id field is the external system's unique ID (not a randomly generated one)
  • Check idempotency logic in your task handlers
  • Verify you're filtering out existing IDs before creating download tasks

Q: New content takes too long to appear

  • Verify your 'start' task starts from the most recent content
  • Check that you're using contentExists to skip already-downloaded items
  • Implement early stop optimization
  • Ensure tasks are being created with appropriate priorities