Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/resources/connector/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./api.js";
export * from "./config.js";
export * from "./push.js";
export * from "./resource.js";
export * from "./schema.js";
91 changes: 91 additions & 0 deletions src/core/resources/connector/push.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import {
listConnectors,
removeConnector,
syncConnector,
} from "./api.js";
import type {
ConnectorResource,
IntegrationType,
SyncConnectorResponse,
} from "./schema.js";

export interface ConnectorSyncResult {
type: IntegrationType;
action: "synced" | "removed" | "needs_oauth" | "error";
redirectUrl?: string;
connectionId?: string;
error?: string;
}

export interface PushConnectorsResponse {
results: ConnectorSyncResult[];
}

export async function pushConnectors(
connectors: ConnectorResource[]
): Promise<PushConnectorsResponse> {
const results: ConnectorSyncResult[] = [];
const upstream = await listConnectors();
const localTypes = new Set(connectors.map((c) => c.type));

for (const connector of connectors) {
try {
const response = await syncConnector(connector.type, connector.scopes);
results.push(syncResponseToResult(connector.type, response));
} catch (err) {
results.push({
type: connector.type,
action: "error",
error: err instanceof Error ? err.message : String(err),
});
}
}

for (const upstreamConnector of upstream.integrations) {
if (!localTypes.has(upstreamConnector.integration_type)) {
try {
await removeConnector(upstreamConnector.integration_type);
results.push({
type: upstreamConnector.integration_type,
action: "removed",
});
} catch (err) {
results.push({
type: upstreamConnector.integration_type,
action: "error",
error: err instanceof Error ? err.message : String(err),
});
}
}
}

return { results };
}

function syncResponseToResult(
type: IntegrationType,
response: SyncConnectorResponse
): ConnectorSyncResult {
if (response.error === "different_user") {
return {
type,
action: "error",
error: response.error_message || `Already connected by ${response.other_user_email}`,
};
}

if (response.already_authorized) {
return { type, action: "synced" };
}

if (response.redirect_url) {
return {
type,
action: "needs_oauth",
redirectUrl: response.redirect_url,
connectionId: response.connection_id ?? undefined,
};
}

return { type, action: "synced" };
}
5 changes: 2 additions & 3 deletions src/core/resources/connector/resource.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type { Resource } from "../types.js";
import { readAllConnectors } from "./config.js";
import { pushConnectors } from "./push.js";
import type { ConnectorResource } from "./schema.js";

export const connectorResource: Resource<ConnectorResource> = {
readAll: readAllConnectors,
push: async () => {
throw new Error("Connector push not yet implemented");
},
push: pushConnectors,
};
209 changes: 208 additions & 1 deletion tests/core/connectors.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { resolve } from "node:path";
import { describe, expect, it } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as api from "../../src/core/resources/connector/api.js";
import { readAllConnectors } from "../../src/core/resources/connector/config.js";
import { pushConnectors } from "../../src/core/resources/connector/push.js";
import {
ConnectorResourceSchema,
IntegrationTypeSchema,
type ConnectorResource,
} from "../../src/core/resources/connector/schema.js";

vi.mock("../../src/core/resources/connector/api.js");

const FIXTURES_DIR = resolve(__dirname, "../fixtures");

describe("IntegrationTypeSchema", () => {
Expand Down Expand Up @@ -146,3 +151,205 @@ describe("readAllConnectors", () => {
);
});
});

const mockListConnectors = vi.mocked(api.listConnectors);
const mockSyncConnector = vi.mocked(api.syncConnector);
const mockRemoveConnector = vi.mocked(api.removeConnector);

describe("pushConnectors", () => {
beforeEach(() => {
vi.resetAllMocks();
mockListConnectors.mockResolvedValue({ integrations: [] });
});

it("returns empty results when no local or upstream connectors", async () => {
const result = await pushConnectors([]);
expect(result.results).toEqual([]);
expect(mockListConnectors).toHaveBeenCalledOnce();
});

it("syncs local connectors", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockSyncConnector.mockResolvedValue({
redirect_url: null,
connection_id: null,
already_authorized: true,
});

const result = await pushConnectors(local);

expect(mockSyncConnector).toHaveBeenCalledWith("gmail", [
"https://mail.google.com/",
]);
expect(result.results).toEqual([{ type: "gmail", action: "synced" }]);
});

it("removes upstream-only connectors", async () => {
mockListConnectors.mockResolvedValue({
integrations: [
{ integration_type: "slack", status: "ACTIVE", scopes: ["chat:write"] },
],
});
mockRemoveConnector.mockResolvedValue({
status: "removed",
integration_type: "slack",
});

const result = await pushConnectors([]);

expect(mockRemoveConnector).toHaveBeenCalledWith("slack");
expect(result.results).toEqual([{ type: "slack", action: "removed" }]);
});

it("syncs local and removes upstream-only", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockListConnectors.mockResolvedValue({
integrations: [
{ integration_type: "slack", status: "ACTIVE", scopes: ["chat:write"] },
],
});
mockSyncConnector.mockResolvedValue({
redirect_url: null,
connection_id: null,
already_authorized: true,
});
mockRemoveConnector.mockResolvedValue({
status: "removed",
integration_type: "slack",
});

const result = await pushConnectors(local);

expect(mockSyncConnector).toHaveBeenCalledWith("gmail", [
"https://mail.google.com/",
]);
expect(mockRemoveConnector).toHaveBeenCalledWith("slack");
expect(result.results).toEqual([
{ type: "gmail", action: "synced" },
{ type: "slack", action: "removed" },
]);
});

it("does not remove connectors that exist locally", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockListConnectors.mockResolvedValue({
integrations: [
{
integration_type: "gmail",
status: "ACTIVE",
scopes: ["https://mail.google.com/"],
},
],
});
mockSyncConnector.mockResolvedValue({
redirect_url: null,
connection_id: null,
already_authorized: true,
});

const result = await pushConnectors(local);

expect(mockRemoveConnector).not.toHaveBeenCalled();
expect(result.results).toEqual([{ type: "gmail", action: "synced" }]);
});

it("returns needs_oauth when redirect_url is present", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockSyncConnector.mockResolvedValue({
redirect_url: "https://accounts.google.com/oauth",
connection_id: "conn_123",
already_authorized: false,
});

const result = await pushConnectors(local);

expect(result.results).toEqual([
{
type: "gmail",
action: "needs_oauth",
redirectUrl: "https://accounts.google.com/oauth",
connectionId: "conn_123",
},
]);
});

it("returns error for different_user response", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockSyncConnector.mockResolvedValue({
redirect_url: null,
connection_id: null,
already_authorized: false,
error: "different_user",
error_message: "Already connected by another user",
other_user_email: "other@example.com",
});

const result = await pushConnectors(local);

expect(result.results).toEqual([
{
type: "gmail",
action: "error",
error: "Already connected by another user",
},
]);
});

it("handles sync errors gracefully", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
];
mockSyncConnector.mockRejectedValue(new Error("Network error"));

const result = await pushConnectors(local);

expect(result.results).toEqual([
{ type: "gmail", action: "error", error: "Network error" },
]);
});

it("handles remove errors gracefully", async () => {
mockListConnectors.mockResolvedValue({
integrations: [
{ integration_type: "slack", status: "ACTIVE", scopes: ["chat:write"] },
],
});
mockRemoveConnector.mockRejectedValue(new Error("Remove failed"));

const result = await pushConnectors([]);

expect(result.results).toEqual([
{ type: "slack", action: "error", error: "Remove failed" },
]);
});

it("processes multiple local connectors", async () => {
const local: ConnectorResource[] = [
{ type: "gmail", scopes: ["https://mail.google.com/"] },
{ type: "slack", scopes: ["chat:write"] },
];
mockSyncConnector.mockResolvedValue({
redirect_url: null,
connection_id: null,
already_authorized: true,
});

const result = await pushConnectors(local);

expect(mockSyncConnector).toHaveBeenCalledTimes(2);
expect(result.results).toEqual([
{ type: "gmail", action: "synced" },
{ type: "slack", action: "synced" },
]);
});
});