diff --git a/src/core/resources/connector/index.ts b/src/core/resources/connector/index.ts index 2d2896b..417096f 100644 --- a/src/core/resources/connector/index.ts +++ b/src/core/resources/connector/index.ts @@ -1,5 +1,6 @@ export * from "./api.js"; export * from "./config.js"; +export * from "./oauth.js"; export * from "./push.js"; export * from "./resource.js"; export * from "./schema.js"; diff --git a/src/core/resources/connector/oauth.ts b/src/core/resources/connector/oauth.ts new file mode 100644 index 0000000..0ca0595 --- /dev/null +++ b/src/core/resources/connector/oauth.ts @@ -0,0 +1,46 @@ +import open from "open"; +import pWaitFor, { TimeoutError } from "p-wait-for"; +import { getOAuthStatus } from "./api.js"; +import type { IntegrationType, ConnectorOAuthStatus } from "./schema.js"; + +const POLL_INTERVAL_MS = 2000; +const POLL_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + +export interface OAuthFlowParams { + type: IntegrationType; + redirectUrl: string; + connectionId: string; +} + +export interface OAuthFlowResult { + type: IntegrationType; + status: ConnectorOAuthStatus; +} + +export async function runOAuthFlow( + params: OAuthFlowParams +): Promise { + await open(params.redirectUrl); + + let finalStatus: ConnectorOAuthStatus = "PENDING"; + + await pWaitFor( + async () => { + const response = await getOAuthStatus(params.type, params.connectionId); + finalStatus = response.status; + return response.status !== "PENDING"; + }, + { + interval: POLL_INTERVAL_MS, + timeout: POLL_TIMEOUT_MS, + } + ).catch((err) => { + if (err instanceof TimeoutError) { + finalStatus = "PENDING"; + } else { + throw err; + } + }); + + return { type: params.type, status: finalStatus }; +} diff --git a/src/core/resources/connector/schema.ts b/src/core/resources/connector/schema.ts index c47f16a..4406d01 100644 --- a/src/core/resources/connector/schema.ts +++ b/src/core/resources/connector/schema.ts @@ -142,12 +142,12 @@ export const SyncConnectorResponseSchema = z.object({ export type SyncConnectorResponse = z.infer; -export const OAuthPollingStatusSchema = z.enum(["ACTIVE", "FAILED", "PENDING"]); +export const ConnectorOAuthStatusSchema = z.enum(["ACTIVE", "FAILED", "PENDING"]); -export type OAuthPollingStatus = z.infer; +export type ConnectorOAuthStatus = z.infer; export const OAuthStatusResponseSchema = z.object({ - status: OAuthPollingStatusSchema, + status: ConnectorOAuthStatusSchema, }); export type OAuthStatusResponse = z.infer; diff --git a/tests/core/connectors.spec.ts b/tests/core/connectors.spec.ts index 7a65739..6ea0d31 100644 --- a/tests/core/connectors.spec.ts +++ b/tests/core/connectors.spec.ts @@ -2,6 +2,10 @@ import { resolve } from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import * as api from "../../src/core/resources/connector/api.js"; import { readAllConnectors } from "../../src/core/resources/connector/config.js"; +import { + runOAuthFlow, + type OAuthFlowParams, +} from "../../src/core/resources/connector/oauth.js"; import { pushConnectors } from "../../src/core/resources/connector/push.js"; import { ConnectorResourceSchema, @@ -10,6 +14,7 @@ import { } from "../../src/core/resources/connector/schema.js"; vi.mock("../../src/core/resources/connector/api.js"); +vi.mock("open", () => ({ default: vi.fn() })); const FIXTURES_DIR = resolve(__dirname, "../fixtures"); @@ -353,3 +358,55 @@ describe("pushConnectors", () => { ]); }); }); + +const mockGetOAuthStatus = vi.mocked(api.getOAuthStatus); + +describe("runOAuthFlow", () => { + beforeEach(() => { + vi.resetAllMocks(); + }); + + it("returns ACTIVE when OAuth completes successfully", async () => { + const params: OAuthFlowParams = { + type: "gmail", + redirectUrl: "https://accounts.google.com/oauth", + connectionId: "conn_123", + }; + mockGetOAuthStatus.mockResolvedValue({ status: "ACTIVE" }); + + const result = await runOAuthFlow(params); + + expect(result).toEqual({ type: "gmail", status: "ACTIVE" }); + expect(mockGetOAuthStatus).toHaveBeenCalledWith("gmail", "conn_123"); + }); + + it("returns FAILED when OAuth fails", async () => { + const params: OAuthFlowParams = { + type: "gmail", + redirectUrl: "https://accounts.google.com/oauth", + connectionId: "conn_123", + }; + mockGetOAuthStatus.mockResolvedValue({ status: "FAILED" }); + + const result = await runOAuthFlow(params); + + expect(result).toEqual({ type: "gmail", status: "FAILED" }); + }); + + it("polls until status changes from PENDING", async () => { + const params: OAuthFlowParams = { + type: "gmail", + redirectUrl: "https://accounts.google.com/oauth", + connectionId: "conn_123", + }; + mockGetOAuthStatus + .mockResolvedValueOnce({ status: "PENDING" }) + .mockResolvedValueOnce({ status: "PENDING" }) + .mockResolvedValueOnce({ status: "ACTIVE" }); + + const result = await runOAuthFlow(params); + + expect(result).toEqual({ type: "gmail", status: "ACTIVE" }); + expect(mockGetOAuthStatus).toHaveBeenCalledTimes(3); + }); +});