Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/resources/connector/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from "./api.js";
export * from "./config.js";
export * from "./oauth.js";
export * from "./push.js";
export * from "./resource.js";
export * from "./schema.js";
46 changes: 46 additions & 0 deletions src/core/resources/connector/oauth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import open from "open";
import pWaitFor, { TimeoutError } from "p-wait-for";
import { getOAuthStatus } from "./api.js";
import type { IntegrationType, ConnectorOAuthStatus } from "./schema.js";

const POLL_INTERVAL_MS = 2000;
const POLL_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes

export interface OAuthFlowParams {
type: IntegrationType;
redirectUrl: string;
connectionId: string;
}

export interface OAuthFlowResult {
type: IntegrationType;
status: ConnectorOAuthStatus;
}

export async function runOAuthFlow(
params: OAuthFlowParams
): Promise<OAuthFlowResult> {
await open(params.redirectUrl);

let finalStatus: ConnectorOAuthStatus = "PENDING";

await pWaitFor(
async () => {
const response = await getOAuthStatus(params.type, params.connectionId);
finalStatus = response.status;
return response.status !== "PENDING";
},
{
interval: POLL_INTERVAL_MS,
timeout: POLL_TIMEOUT_MS,
}
).catch((err) => {
if (err instanceof TimeoutError) {
finalStatus = "PENDING";
} else {
throw err;
}
});

return { type: params.type, status: finalStatus };
}
6 changes: 3 additions & 3 deletions src/core/resources/connector/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ export const SyncConnectorResponseSchema = z.object({

export type SyncConnectorResponse = z.infer<typeof SyncConnectorResponseSchema>;

export const OAuthPollingStatusSchema = z.enum(["ACTIVE", "FAILED", "PENDING"]);
export const ConnectorOAuthStatusSchema = z.enum(["ACTIVE", "FAILED", "PENDING"]);

export type OAuthPollingStatus = z.infer<typeof OAuthPollingStatusSchema>;
export type ConnectorOAuthStatus = z.infer<typeof ConnectorOAuthStatusSchema>;

export const OAuthStatusResponseSchema = z.object({
status: OAuthPollingStatusSchema,
status: ConnectorOAuthStatusSchema,
});

export type OAuthStatusResponse = z.infer<typeof OAuthStatusResponseSchema>;
Expand Down
57 changes: 57 additions & 0 deletions tests/core/connectors.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { resolve } from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as api from "../../src/core/resources/connector/api.js";
import { readAllConnectors } from "../../src/core/resources/connector/config.js";
import {
runOAuthFlow,
type OAuthFlowParams,
} from "../../src/core/resources/connector/oauth.js";
import { pushConnectors } from "../../src/core/resources/connector/push.js";
import {
ConnectorResourceSchema,
Expand All @@ -10,6 +14,7 @@ import {
} from "../../src/core/resources/connector/schema.js";

vi.mock("../../src/core/resources/connector/api.js");
vi.mock("open", () => ({ default: vi.fn() }));

const FIXTURES_DIR = resolve(__dirname, "../fixtures");

Expand Down Expand Up @@ -353,3 +358,55 @@ describe("pushConnectors", () => {
]);
});
});

const mockGetOAuthStatus = vi.mocked(api.getOAuthStatus);

describe("runOAuthFlow", () => {
beforeEach(() => {
vi.resetAllMocks();
});

it("returns ACTIVE when OAuth completes successfully", async () => {
const params: OAuthFlowParams = {
type: "gmail",
redirectUrl: "https://accounts.google.com/oauth",
connectionId: "conn_123",
};
mockGetOAuthStatus.mockResolvedValue({ status: "ACTIVE" });

const result = await runOAuthFlow(params);

expect(result).toEqual({ type: "gmail", status: "ACTIVE" });
expect(mockGetOAuthStatus).toHaveBeenCalledWith("gmail", "conn_123");
});

it("returns FAILED when OAuth fails", async () => {
const params: OAuthFlowParams = {
type: "gmail",
redirectUrl: "https://accounts.google.com/oauth",
connectionId: "conn_123",
};
mockGetOAuthStatus.mockResolvedValue({ status: "FAILED" });

const result = await runOAuthFlow(params);

expect(result).toEqual({ type: "gmail", status: "FAILED" });
});

it("polls until status changes from PENDING", async () => {
const params: OAuthFlowParams = {
type: "gmail",
redirectUrl: "https://accounts.google.com/oauth",
connectionId: "conn_123",
};
mockGetOAuthStatus
.mockResolvedValueOnce({ status: "PENDING" })
.mockResolvedValueOnce({ status: "PENDING" })
.mockResolvedValueOnce({ status: "ACTIVE" });

const result = await runOAuthFlow(params);

expect(result).toEqual({ type: "gmail", status: "ACTIVE" });
expect(mockGetOAuthStatus).toHaveBeenCalledTimes(3);
});
});