Skip to main content

Idempotency and Data Synchronization

1. The Problem: Duplicate Content

In a system that periodically syncs data from external sources, a common challenge is preventing the same piece of content from being downloaded multiple times. Without a de-duplication mechanism, our integrations will repeatedly fetch and store the same emails, files, or messages on every polling cycle.

This leads to:

  • Data Duplication: The database fills with redundant copies of the same information.
  • Wasted Resources: The system consumes unnecessary API quota, CPU time, and network bandwidth processing content it already has.
  • Potential for Errors: Reprocessing the same data can sometimes lead to inconsistent states or errors.

The goal is to make our synchronization tasks idempotent, meaning that running the same task multiple times has the same effect as running it once.

2. The Constraint: Decoupled Integrations

A core design principle of our integration architecture is that individual integrations (e.g., Gmail, Slack) must remain decoupled from the main application's core services and database implementation. An integration should not directly import or depend on UserIntegrationContentService or any other application-specific service.

3. Proposed Solution: Dependency Inversion via Callback

To solve the problem while respecting the constraint, we will use the principle of Dependency Inversion. We will provide each integration with a generic "checker" function when it is created. This gives the integration the capability it needs without it knowing the underlying implementation details.

Step 1: Define the ContentExistenceChecker Function

We will define a new function type that will act as the contract between the core application and the integration.

/**
* @param externalIds An array of unique identifiers from the external source (e.g., Gmail message IDs).
* @returns A Promise that resolves to a Set containing the subset of IDs that already exist in our database.
*/
export type ContentExistenceChecker = (externalIds: string[]) => Promise<Set<string>>;

Step 2: Inject the Checker into the BaseIntegration

We will update the BaseIntegration constructor to accept an implementation of this checker function. This makes the capability available to all integrations.

// In: src/integration/_base/main.ts

export abstract class BaseIntegration {
protected credentials: any;
protected contentExists: ContentExistenceChecker;

constructor(
credentials: any,
contentExists: ContentExistenceChecker,
) {
this.credentials = credentials;
this.contentExists = contentExists;
}
// ...
}

Step 3: Implement the Checker in the PollingProcessor

The PollingProcessor is responsible for creating integration instances. It has access to application services, so it is the perfect place to create the concrete implementation of the checker function.

// In: src/integration/polling/polling.processor.ts
// ... inside the handleTask method

// Create the checker function using the content service
const contentExists: ContentExistenceChecker = async (externalIds) => {
const { results } = await this.userIntegrationContentService.readMany({
userIntegrationId: userIntegrationId,
inExternalIds: externalIds, // Assumes this filter is added to the service
});
return new Set(results.map(r => r.externalId));
};

// Pass the function when creating the integration instance
const integration = this.loader.loadIntegrationInstance(
integrationDomain,
credentials,
contentExists, // The function is passed here
);

Step 4: Use the Checker Within an Integration

The individual integration can now use the provided function to filter out duplicates before creating new processing tasks.

// In: src/integration/gmail/src/main.ts
// ... inside the runTask method

// 1. Get all message IDs from the current API response
const messageIds = response.data.messages.map(m => m.id!);

// 2. Use the provided function to find which IDs we already have
const existingIds = await this.contentExists(messageIds);

// 3. Only create download tasks for messages that are not in the "existing" set
const newMessages = response.data.messages.filter(m => !existingIds.has(m.id!));

for (const message of newMessages) {
// Create a download task for this new message...
}

4. Handling Flexibility and Edge Cases

This design is flexible enough to handle important edge cases.

Optional De-duplication (e.g., for an FTP integration)

Some integrations may want to re-download content to check for differences. This design handles that scenario perfectly because using the checker is optional.

An integration that wants to de-duplicate (like Gmail) will call this.contentExists. An integration that wants to re-download everything (like a hypothetical FTP integration) will simply not call this.contentExists. The capability is there if needed, but it is not enforced.

Forcing a Re-Sync

There will be times when a user or admin needs to force a specific piece of content to be re-downloaded. This can be accomplished by adding an optional force flag to the task payload.

The integration's logic would then be updated to check for this flag:

// Inside an integration's runTask method...

public async runTask(task: GmailTask): Promise<GmailTaskResult> {
// ...
const messageIds = response.data.messages.map(m => m.id!);
let existingIds = new Set<string>();

// Only perform the de-duplication check if the 'force' flag is NOT set
if (!task.payload.force) {
existingIds = await this.contentExists(messageIds);
}

// If forcing specific IDs, remove them from the set of existing IDs
if (task.payload.forceIds) {
task.payload.forceIds.forEach(id => existingIds.delete(id));
}

const newMessages = response.data.messages.filter(m => !existingIds.has(m.id!));
// ... create tasks for newMessages
}

This gives us granular control to bypass the de-duplication logic when needed, without complicating the core design.

5. Summary of Benefits

  • Decoupled: Integrations don't know about the database or core services.
  • Reusable: The same pattern can be used by any integration.
  • Flexible: Integrations can opt-out of de-duplication, and re-syncs can be forced when necessary.
  • Efficient: Performs a single, bulk query to check for many items at once.