From 1baffe212d1621fb8268c0d83ec191f77693610f Mon Sep 17 00:00:00 2001 From: zetazzz Date: Wed, 7 Jan 2026 15:00:34 +0700 Subject: [PATCH 1/6] cnc jobs up and combined server --- jobs/knative-job-fn/src/index.ts | 2 +- packages/cli/package.json | 1 + packages/cli/src/commands.ts | 4 +- packages/cli/src/commands/jobs.ts | 216 ++++++++++++++++++++++++++++++ packages/cli/src/utils/display.ts | 4 + packages/server/CHANGELOG.md | 5 + packages/server/README.md | 77 +++++++++++ packages/server/jest.config.js | 18 +++ packages/server/package.json | 56 ++++++++ packages/server/src/index.ts | 3 + packages/server/src/run.ts | 96 +++++++++++++ packages/server/src/server.ts | 165 +++++++++++++++++++++++ packages/server/src/types.ts | 39 ++++++ packages/server/tsconfig.esm.json | 9 ++ packages/server/tsconfig.json | 9 ++ pnpm-lock.yaml | 83 +++++++----- 16 files changed, 749 insertions(+), 38 deletions(-) create mode 100644 packages/cli/src/commands/jobs.ts create mode 100644 packages/server/CHANGELOG.md create mode 100644 packages/server/README.md create mode 100644 packages/server/jest.config.js create mode 100644 packages/server/package.json create mode 100644 packages/server/src/index.ts create mode 100644 packages/server/src/run.ts create mode 100644 packages/server/src/server.ts create mode 100644 packages/server/src/types.ts create mode 100644 packages/server/tsconfig.esm.json create mode 100644 packages/server/tsconfig.json diff --git a/jobs/knative-job-fn/src/index.ts b/jobs/knative-job-fn/src/index.ts index 77befd2b1..ff09e1d88 100644 --- a/jobs/knative-job-fn/src/index.ts +++ b/jobs/knative-job-fn/src/index.ts @@ -263,6 +263,6 @@ export default { res.status(200).json({ message: error.message }); }); - app.listen(port, cb); + return app.listen(port, cb); } }; diff --git a/packages/cli/package.json b/packages/cli/package.json index 733ae6c9e..26f3e32a1 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -50,6 +50,7 @@ "@constructive-io/graphql-env": "workspace:^", "@constructive-io/graphql-explorer": "workspace:^", "@constructive-io/graphql-server": "workspace:^", + "@constructive-io/server": "workspace:^", "@inquirerer/utils": "^3.1.2", "@pgpmjs/core": "workspace:^", "@pgpmjs/logger": "workspace:^", diff --git a/packages/cli/src/commands.ts b/packages/cli/src/commands.ts index 3d28adb56..c96f60a38 100644 --- a/packages/cli/src/commands.ts +++ b/packages/cli/src/commands.ts @@ -6,6 +6,7 @@ import { ParsedArgs } from 'minimist'; import codegen from './commands/codegen'; import explorer from './commands/explorer'; import getGraphqlSchema from './commands/get-graphql-schema'; +import jobs from './commands/jobs'; import server from './commands/server'; import { usageText } from './utils'; @@ -14,7 +15,8 @@ const createCommandMap = (): Record => { server, explorer, 'get-graphql-schema': getGraphqlSchema, - codegen + codegen, + jobs }; }; diff --git a/packages/cli/src/commands/jobs.ts b/packages/cli/src/commands/jobs.ts new file mode 100644 index 000000000..084d349b8 --- /dev/null +++ b/packages/cli/src/commands/jobs.ts @@ -0,0 +1,216 @@ +import { existsSync } from 'fs'; +import { resolve } from 'path'; +import { + CombinedServer, + CombinedServerOptions, + FunctionName, + FunctionServiceConfig +} from '@constructive-io/server'; +import { cliExitWithError, extractFirst } from '@inquirerer/utils'; +import { CLIOptions, Inquirerer, Question } from 'inquirerer'; + +const jobsUsageText = ` +Constructive Jobs: + + cnc jobs [OPTIONS] + + Start or manage Constructive jobs services. + +Subcommands: + up Start combined server (jobs runtime) + +Options: + --help, -h Show this help message + --cwd Working directory (default: current directory) + --with-graphql-server Enable GraphQL server (default: disabled; flag-only) + --with-jobs-svc Enable jobs service (default: disabled; flag-only) + --functions Comma-separated functions, optionally with ports (e.g. "fn=8080") + +Examples: + cnc jobs up + cnc jobs up --cwd /path/to/constructive + cnc jobs up --with-graphql-server --functions simple-email,send-email-link=8082 +`; + +const questions: Question[] = [ + { + name: 'withGraphqlServer', + alias: 'with-graphql-server', + message: 'Enable GraphQL server?', + type: 'confirm', + required: false, + default: false, + useDefault: true + }, + { + name: 'withJobsSvc', + alias: 'with-jobs-svc', + message: 'Enable jobs service?', + type: 'confirm', + required: false, + default: false, + useDefault: true + } +]; + +const ensureCwd = (cwd: string): string => { + const resolved = resolve(cwd); + if (!existsSync(resolved)) { + throw new Error(`Working directory does not exist: ${resolved}`); + } + process.chdir(resolved); + return resolved; +}; + +type ParsedFunctionsArg = { + mode: 'all' | 'list'; + names: string[]; + ports: Record; +}; + +const parseFunctionsArg = (value: unknown): ParsedFunctionsArg | undefined => { + if (value === undefined) return undefined; + + const values = Array.isArray(value) ? value : [value]; + + const tokens: string[] = []; + for (const value of values) { + if (value === true) { + tokens.push('all'); + continue; + } + if (value === false || value === undefined || value === null) continue; + const raw = String(value); + raw + .split(',') + .map((part) => part.trim()) + .filter(Boolean) + .forEach((part) => tokens.push(part)); + } + + if (!tokens.length) { + return { mode: 'list', names: [], ports: {} }; + } + + const hasAll = tokens.some((token) => { + const normalized = token.trim().toLowerCase(); + return normalized === 'all' || normalized === '*'; + }); + + if (hasAll) { + if (tokens.length > 1) { + throw new Error('Use "all" without other function names.'); + } + return { mode: 'all', names: [], ports: {} }; + } + + const names: string[] = []; + const ports: Record = {}; + + for (const token of tokens) { + const trimmed = token.trim(); + if (!trimmed) continue; + + const separatorIndex = trimmed.search(/[:=]/); + if (separatorIndex === -1) { + names.push(trimmed); + continue; + } + + const name = trimmed.slice(0, separatorIndex).trim(); + const portText = trimmed.slice(separatorIndex + 1).trim(); + + if (!name) { + throw new Error(`Missing function name in "${token}".`); + } + if (!portText) { + throw new Error(`Missing port for function "${name}".`); + } + + const port = Number(portText); + if (!Number.isFinite(port) || port <= 0) { + throw new Error(`Invalid port "${portText}" for function "${name}".`); + } + + names.push(name); + ports[name] = port; + } + + const uniqueNames: string[] = []; + const seen = new Set(); + for (const name of names) { + if (seen.has(name)) continue; + seen.add(name); + uniqueNames.push(name); + } + + return { mode: 'list', names: uniqueNames, ports }; +}; + +const buildCombinedServerOptions = ( + args: Partial> +): CombinedServerOptions => { + const parsedFunctions = parseFunctionsArg(args.functions); + + let functions: CombinedServerOptions['functions']; + if (parsedFunctions) { + if (parsedFunctions.mode === 'all') { + functions = { enabled: true }; + } else if (parsedFunctions.names.length) { + const services: FunctionServiceConfig[] = parsedFunctions.names.map( + (name) => ({ + name: name as FunctionName, + port: parsedFunctions.ports[name] + }) + ); + functions = { enabled: true, services }; + } else { + functions = undefined; + } + } + + return { + graphql: { enabled: args.withGraphqlServer === true }, + jobs: { enabled: args.withJobsSvc === true }, + functions + }; +}; + +export default async ( + argv: Partial>, + prompter: Inquirerer, + _options: CLIOptions +) => { + if (argv.help || argv.h) { + console.log(jobsUsageText); + process.exit(0); + } + + const { first: subcommand, newArgv } = extractFirst(argv); + const args = newArgv as Partial>; + + if (!subcommand) { + console.log(jobsUsageText); + await cliExitWithError('No subcommand provided. Use "up".'); + return; + } + + switch (subcommand) { + case 'up': { + try { + ensureCwd((args.cwd as string) || process.cwd()); + const promptAnswers = await prompter.prompt(args, questions); + await CombinedServer(buildCombinedServerOptions(promptAnswers)); + } catch (error) { + await cliExitWithError( + `Failed to start combined server: ${(error as Error).message}` + ); + } + break; + } + + default: + console.log(jobsUsageText); + await cliExitWithError(`Unknown subcommand: ${subcommand}. Use "up".`); + } +}; diff --git a/packages/cli/src/utils/display.ts b/packages/cli/src/utils/display.ts index 5c856dd2b..f25edc5bf 100644 --- a/packages/cli/src/utils/display.ts +++ b/packages/cli/src/utils/display.ts @@ -12,6 +12,9 @@ export const usageText = ` codegen Generate TypeScript types and SDK from GraphQL schema get-graphql-schema Fetch or build GraphQL schema SDL + Jobs: + jobs up Start combined server (jobs runtime) + Global Options: -h, --help Display this help information -v, --version Display version information @@ -27,6 +30,7 @@ export const usageText = ` cnc explorer Launch GraphiQL explorer cnc codegen --schema schema.graphql Generate types from schema cnc get-graphql-schema --out schema.graphql Export schema SDL + cnc jobs up Start combined server (jobs runtime) Database Operations: For database migrations, packages, and deployment, use pgpm: diff --git a/packages/server/CHANGELOG.md b/packages/server/CHANGELOG.md new file mode 100644 index 000000000..c29286903 --- /dev/null +++ b/packages/server/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.1.0 + +- Initial combined server for GraphQL, jobs, and functions. diff --git a/packages/server/README.md b/packages/server/README.md new file mode 100644 index 000000000..60ee95a4e --- /dev/null +++ b/packages/server/README.md @@ -0,0 +1,77 @@ +# @constructive-io/server + +

+ +

+ +

+ + + + +

+ +**Constructive Combined Server** starts GraphQL, jobs runtime, and Knative-style functions from a single entrypoint. + +## Quick Start + +### Use as SDK + +```ts +import { CombinedServer } from '@constructive-io/server'; + +await CombinedServer({ + graphql: { enabled: true }, + functions: { + enabled: true, + services: [ + { name: 'simple-email', port: 8081 }, + { name: 'send-email-link', port: 8082 } + ] + }, + jobs: { enabled: true } +}); +``` + +### Local Development (this repo) + +```bash +pnpm install +cd packages/server +pnpm dev +``` + +## Environment Configuration + +The `src/run.ts` entrypoint reads a small set of env flags for quick local orchestration: + +| Env var | Purpose | Default | +| --- | --- | --- | +| `CONSTRUCTIVE_GRAPHQL_ENABLED` | Start the GraphQL server | `true` | +| `CONSTRUCTIVE_JOBS_ENABLED` | Start the jobs runtime | `false` | +| `CONSTRUCTIVE_FUNCTIONS` | Comma-separated function list or `all` | empty | +| `CONSTRUCTIVE_FUNCTION_PORTS` | Port map (`name=port,name=port`) | none | + +Examples: + +```bash +# Start GraphQL only +CONSTRUCTIVE_GRAPHQL_ENABLED=true pnpm dev + +# Start only the simple-email function +CONSTRUCTIVE_GRAPHQL_ENABLED=false \ +CONSTRUCTIVE_FUNCTIONS=simple-email \ +CONSTRUCTIVE_FUNCTION_PORTS=simple-email=8081 \ +pnpm dev + +# Start all functions + jobs +CONSTRUCTIVE_FUNCTIONS=all \ +CONSTRUCTIVE_JOBS_ENABLED=true \ +CONSTRUCTIVE_FUNCTION_PORTS=simple-email=8081,send-email-link=8082 \ +pnpm dev +``` + +## Default Function Ports + +- `simple-email`: `8081` +- `send-email-link`: `8082` diff --git a/packages/server/jest.config.js b/packages/server/jest.config.js new file mode 100644 index 000000000..057a9420e --- /dev/null +++ b/packages/server/jest.config.js @@ -0,0 +1,18 @@ +/** @type {import('ts-jest').JestConfigWithTsJest} */ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + transform: { + '^.+\\.tsx?$': [ + 'ts-jest', + { + babelConfig: false, + tsconfig: 'tsconfig.json', + }, + ], + }, + transformIgnorePatterns: [`/node_modules/*`], + testRegex: '(/__tests__/.*|(\\.|/)(test|spec))\\.(jsx?|tsx?)$', + moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'json', 'node'], + modulePathIgnorePatterns: ['dist/*'] +}; diff --git a/packages/server/package.json b/packages/server/package.json new file mode 100644 index 000000000..c54683654 --- /dev/null +++ b/packages/server/package.json @@ -0,0 +1,56 @@ +{ + "name": "@constructive-io/server", + "version": "0.1.0", + "author": "Constructive ", + "description": "Combined Constructive server for GraphQL, jobs, and functions", + "main": "index.js", + "module": "esm/index.js", + "types": "index.d.ts", + "homepage": "https://github.com/constructive-io/constructive", + "license": "MIT", + "publishConfig": { + "access": "public", + "directory": "dist" + }, + "repository": { + "type": "git", + "url": "https://github.com/constructive-io/constructive" + }, + "bugs": { + "url": "https://github.com/constructive-io/constructive/issues" + }, + "scripts": { + "clean": "makage clean", + "prepack": "npm run build", + "build": "makage build", + "build:dev": "makage build --dev", + "dev": "ts-node src/run.ts", + "dev:watch": "nodemon --watch src --ext ts --exec ts-node src/run.ts", + "lint": "eslint . --fix", + "test": "jest --passWithNoTests", + "test:watch": "jest --watch" + }, + "keywords": [ + "server", + "constructive", + "graphql", + "jobs", + "functions", + "orchestrator" + ], + "dependencies": { + "@constructive-io/graphql-server": "workspace:^", + "@constructive-io/graphql-types": "workspace:^", + "@constructive-io/knative-job-fn": "workspace:^", + "@constructive-io/knative-job-service": "workspace:^", + "@constructive-io/send-email-link-fn": "workspace:^", + "@constructive-io/simple-email-fn": "workspace:^", + "@pgpmjs/env": "workspace:^", + "@pgpmjs/logger": "workspace:^" + }, + "devDependencies": { + "makage": "^0.1.10", + "nodemon": "^3.1.10", + "ts-node": "^10.9.2" + } +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts new file mode 100644 index 000000000..a53263507 --- /dev/null +++ b/packages/server/src/index.ts @@ -0,0 +1,3 @@ +export * from './server'; +export * from './types'; +export * from './run'; diff --git a/packages/server/src/run.ts b/packages/server/src/run.ts new file mode 100644 index 000000000..9665dec42 --- /dev/null +++ b/packages/server/src/run.ts @@ -0,0 +1,96 @@ +#!/usr/bin/env node + +import { parseEnvBoolean } from '@pgpmjs/env'; + +import { CombinedServer } from './server'; +import { + CombinedServerOptions, + CombinedServerResult, + FunctionName, + FunctionServiceConfig +} from './types'; + +const parseList = (value?: string): string[] => { + if (!value) return []; + return value + .split(',') + .map((item) => item.trim()) + .filter(Boolean); +}; + +const parsePortMap = (value?: string): Record => { + if (!value) return {}; + + const trimmed = value.trim(); + if (!trimmed) return {}; + + if (trimmed.startsWith('{')) { + try { + const parsed = JSON.parse(trimmed) as Record; + return Object.entries(parsed).reduce>((acc, [key, port]) => { + const portNumber = Number(port); + if (Number.isFinite(portNumber)) { + acc[key] = portNumber; + } + return acc; + }, {}); + } catch { + return {}; + } + } + + return trimmed.split(',').reduce>((acc, pair) => { + const [rawName, rawPort] = pair.split(/[:=]/).map((item) => item.trim()); + const port = Number(rawPort); + if (rawName && Number.isFinite(port)) { + acc[rawName] = port; + } + return acc; + }, {}); +}; + +const buildFunctionsOptions = (): CombinedServerOptions['functions'] => { + const rawFunctions = (process.env.CONSTRUCTIVE_FUNCTIONS || '').trim(); + if (!rawFunctions) return undefined; + + const portMap = parsePortMap(process.env.CONSTRUCTIVE_FUNCTION_PORTS); + const normalized = rawFunctions.toLowerCase(); + + if (normalized === 'all' || normalized === '*') { + return { enabled: true }; + } + + const names = parseList(rawFunctions) as FunctionName[]; + if (!names.length) return undefined; + + const services: FunctionServiceConfig[] = names.map((name) => ({ + name, + port: portMap[name] + })); + + return { + enabled: true, + services + }; +}; + +export const buildCombinedServerOptionsFromEnv = (): CombinedServerOptions => ({ + graphql: { + enabled: parseEnvBoolean(process.env.CONSTRUCTIVE_GRAPHQL_ENABLED) ?? true + }, + jobs: { + enabled: parseEnvBoolean(process.env.CONSTRUCTIVE_JOBS_ENABLED) ?? false + }, + functions: buildFunctionsOptions() +}); + +export const startCombinedServerFromEnv = async (): Promise => + CombinedServer(buildCombinedServerOptionsFromEnv()); + +if (require.main === module) { + void startCombinedServerFromEnv().catch((error) => { + // eslint-disable-next-line no-console + console.error('Combined server failed to start:', error); + process.exit(1); + }); +} diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts new file mode 100644 index 000000000..f1be8cc92 --- /dev/null +++ b/packages/server/src/server.ts @@ -0,0 +1,165 @@ +import { GraphQLServer } from '@constructive-io/graphql-server'; +import { bootJobs } from '@constructive-io/knative-job-service/dist/run'; +import { Logger } from '@pgpmjs/logger'; +import { createRequire } from 'module'; + +import { + CombinedServerOptions, + CombinedServerResult, + FunctionName, + FunctionServiceConfig, + FunctionsOptions, + StartedFunction +} from './types'; + +type FunctionRegistryEntry = { + moduleName: string; + defaultPort: number; +}; + +const functionRegistry: Record = { + 'simple-email': { + moduleName: '@constructive-io/simple-email-fn', + defaultPort: 8081 + }, + 'send-email-link': { + moduleName: '@constructive-io/send-email-link-fn', + defaultPort: 8082 + } +}; + +const log = new Logger('combined-server'); +const requireFn = createRequire(__filename); +const functionServers = new Map(); + +const resolveFunctionEntry = (name: FunctionName): FunctionRegistryEntry => { + const entry = functionRegistry[name]; + if (!entry) { + throw new Error(`Unknown function "${name}".`); + } + return entry; +}; + +const loadFunctionApp = (moduleName: string) => { + const knativeModuleId = requireFn.resolve('@constructive-io/knative-job-fn'); + delete requireFn.cache[knativeModuleId]; + + const moduleId = requireFn.resolve(moduleName); + delete requireFn.cache[moduleId]; + + const mod = requireFn(moduleName) as { default?: { listen: (port: number, cb?: () => void) => unknown } }; + const app = mod.default ?? mod; + + if (!app || typeof (app as { listen?: unknown }).listen !== 'function') { + throw new Error(`Function module "${moduleName}" does not export a listenable app.`); + } + + return app as { listen: (port: number, cb?: () => void) => unknown }; +}; + +const shouldEnableFunctions = (options?: FunctionsOptions): boolean => { + if (!options) return false; + if (typeof options.enabled === 'boolean') return options.enabled; + return Boolean(options.services?.length); +}; + +const normalizeFunctionServices = ( + options?: FunctionsOptions +): FunctionServiceConfig[] => { + if (!shouldEnableFunctions(options)) return []; + + if (!options?.services?.length) { + return Object.keys(functionRegistry).map((name) => ({ + name: name as FunctionName + })); + } + + return options.services; +}; + +const resolveFunctionPort = (service: FunctionServiceConfig): number => { + const entry = resolveFunctionEntry(service.name); + return service.port ?? entry.defaultPort; +}; + +const ensureUniquePorts = (services: FunctionServiceConfig[]) => { + const usedPorts = new Set(); + for (const service of services) { + const port = resolveFunctionPort(service); + if (usedPorts.has(port)) { + throw new Error(`Function port ${port} is assigned more than once.`); + } + usedPorts.add(port); + } +}; + +const startFunction = async ( + service: FunctionServiceConfig +): Promise => { + const entry = resolveFunctionEntry(service.name); + const port = resolveFunctionPort(service); + const app = loadFunctionApp(entry.moduleName); + + await new Promise((resolve, reject) => { + const server = app.listen(port, () => { + log.info(`function:${service.name} listening on ${port}`); + resolve(); + }) as { on?: (event: string, cb: (err: Error) => void) => void }; + + if (server?.on) { + server.on('error', (err) => { + log.error(`function:${service.name} failed to start`, err); + reject(err); + }); + } + + functionServers.set(service.name, server); + }); + + return { name: service.name, port }; +}; + +export const startFunctions = async ( + options?: FunctionsOptions +): Promise => { + const services = normalizeFunctionServices(options); + if (!services.length) return []; + + ensureUniquePorts(services); + + const started: StartedFunction[] = []; + for (const service of services) { + started.push(await startFunction(service)); + } + + return started; +}; + +export const CombinedServer = async ( + options: CombinedServerOptions = {} +): Promise => { + const result: CombinedServerResult = { + functions: [], + jobs: false, + graphql: false + }; + + if (options.graphql?.enabled) { + log.info('starting GraphQL server'); + GraphQLServer(options.graphql.options ?? {}); + result.graphql = true; + } + + if (shouldEnableFunctions(options.functions)) { + log.info('starting functions'); + result.functions = await startFunctions(options.functions); + } + + if (options.jobs?.enabled) { + log.info('starting jobs service'); + await bootJobs(); + result.jobs = true; + } + + return result; +}; diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts new file mode 100644 index 000000000..13753afcd --- /dev/null +++ b/packages/server/src/types.ts @@ -0,0 +1,39 @@ +import { ConstructiveOptions } from '@constructive-io/graphql-types'; + +export type FunctionName = 'simple-email' | 'send-email-link'; + +export type FunctionServiceConfig = { + name: FunctionName; + port?: number; +}; + +export type FunctionsOptions = { + enabled?: boolean; + services?: FunctionServiceConfig[]; +}; + +export type JobsOptions = { + enabled?: boolean; +}; + +export type GraphqlOptions = { + enabled?: boolean; + options?: ConstructiveOptions; +}; + +export type CombinedServerOptions = { + functions?: FunctionsOptions; + jobs?: JobsOptions; + graphql?: GraphqlOptions; +}; + +export type StartedFunction = { + name: FunctionName; + port: number; +}; + +export type CombinedServerResult = { + functions: StartedFunction[]; + jobs: boolean; + graphql: boolean; +}; diff --git a/packages/server/tsconfig.esm.json b/packages/server/tsconfig.esm.json new file mode 100644 index 000000000..800d7506d --- /dev/null +++ b/packages/server/tsconfig.esm.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "dist/esm", + "module": "es2022", + "rootDir": "src/", + "declaration": false + } +} diff --git a/packages/server/tsconfig.json b/packages/server/tsconfig.json new file mode 100644 index 000000000..1a9d5696c --- /dev/null +++ b/packages/server/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src/" + }, + "include": ["src/**/*.ts"], + "exclude": ["dist", "node_modules", "**/*.spec.*", "**/*.test.*"] +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0aeed8c30..df791ed8d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -133,7 +133,7 @@ importers: version: 3.1.11 ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) publishDirectory: dist graphile/graphile-i18n: @@ -560,7 +560,7 @@ importers: version: 3.1.11 ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) publishDirectory: dist graphile/graphile-simple-inflector: @@ -829,7 +829,7 @@ importers: version: 3.1.11 ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) publishDirectory: dist graphql/gql-ast: @@ -1023,7 +1023,7 @@ importers: version: 3.1.11 ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) publishDirectory: dist graphql/test: @@ -1203,7 +1203,7 @@ importers: devDependencies: ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) jobs/knative-job-worker: dependencies: @@ -1241,6 +1241,9 @@ importers: '@constructive-io/graphql-server': specifier: workspace:^ version: link:../../graphql/server/dist + '@constructive-io/server': + specifier: workspace:^ + version: link:../server/dist '@inquirerer/utils': specifier: ^3.1.2 version: 3.1.2 @@ -1383,6 +1386,44 @@ importers: version: 0.1.10 publishDirectory: dist + packages/server: + dependencies: + '@constructive-io/graphql-server': + specifier: workspace:^ + version: link:../../graphql/server/dist + '@constructive-io/graphql-types': + specifier: workspace:^ + version: link:../../graphql/types/dist + '@constructive-io/knative-job-fn': + specifier: workspace:^ + version: link:../../jobs/knative-job-fn + '@constructive-io/knative-job-service': + specifier: workspace:^ + version: link:../../jobs/knative-job-service + '@constructive-io/send-email-link-fn': + specifier: workspace:^ + version: link:../../functions/send-email-link + '@constructive-io/simple-email-fn': + specifier: workspace:^ + version: link:../../functions/simple-email + '@pgpmjs/env': + specifier: workspace:^ + version: link:../../pgpm/env/dist + '@pgpmjs/logger': + specifier: workspace:^ + version: link:../../pgpm/logger/dist + devDependencies: + makage: + specifier: ^0.1.10 + version: 0.1.10 + nodemon: + specifier: ^3.1.10 + version: 3.1.11 + ts-node: + specifier: ^10.9.2 + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) + publishDirectory: dist + packages/server-utils: dependencies: '@pgpmjs/logger': @@ -1412,7 +1453,7 @@ importers: version: 0.1.10 ts-node: specifier: ^10.9.2 - version: 10.9.2(@types/node@25.0.3)(typescript@5.9.3) + version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) publishDirectory: dist packages/url-domains: @@ -3440,9 +3481,6 @@ packages: '@types/node@20.19.27': resolution: {integrity: sha512-N2clP5pJhB2YnZJ3PIHFk5RkygRX5WO/5f0WC08tp0wd+sv0rsJk3MqWn3CbNmT2J505a5336jaQj4ph1AdMug==} - '@types/node@25.0.3': - resolution: {integrity: sha512-W609buLVRVmeW693xKfzHeIV6nJGGz98uCPfeXI1ELMLXVeKYZ9m15fAMSaUPBHYLGFsVRcMmSCksQOrZV9BYA==} - '@types/normalize-package-data@2.4.4': resolution: {integrity: sha512-37i+OaWTh9qeK4LSHPsyRC7NahnGotNuZvjLSgcPzblpHB3rrCJxAOgI5gCdKm7coonsaX1Of0ILiTcnZjbfxA==} @@ -7765,9 +7803,6 @@ packages: undici-types@6.21.0: resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==} - undici-types@7.16.0: - resolution: {integrity: sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==} - undici@7.16.0: resolution: {integrity: sha512-QEg3HPMll0o3t2ourKwOeUAZ159Kn9mx5pnzHRQO8+Wixmh88YdZRiIwat0iNzNNXn0yoEtXJqFpyW7eM8BV7g==} engines: {node: '>=20.18.1'} @@ -10332,10 +10367,6 @@ snapshots: dependencies: undici-types: 6.21.0 - '@types/node@25.0.3': - dependencies: - undici-types: 7.16.0 - '@types/normalize-package-data@2.4.4': {} '@types/pg-copy-streams@1.2.5': @@ -15537,24 +15568,6 @@ snapshots: v8-compile-cache-lib: 3.0.1 yn: 3.1.1 - ts-node@10.9.2(@types/node@25.0.3)(typescript@5.9.3): - dependencies: - '@cspotcode/source-map-support': 0.8.1 - '@tsconfig/node10': 1.0.12 - '@tsconfig/node12': 1.0.11 - '@tsconfig/node14': 1.0.3 - '@tsconfig/node16': 1.0.4 - '@types/node': 25.0.3 - acorn: 8.15.0 - acorn-walk: 8.3.4 - arg: 4.1.3 - create-require: 1.1.1 - diff: 4.0.2 - make-error: 1.3.6 - typescript: 5.9.3 - v8-compile-cache-lib: 3.0.1 - yn: 3.1.1 - tsconfig-paths@4.2.0: dependencies: json5: 2.2.3 @@ -15630,8 +15643,6 @@ snapshots: undici-types@6.21.0: {} - undici-types@7.16.0: {} - undici@7.16.0: {} unique-filename@3.0.0: From 6230fbc8fa9084ac2afd0ece3226dd0162435511 Mon Sep 17 00:00:00 2001 From: zetazzz Date: Thu, 8 Jan 2026 08:59:14 +0700 Subject: [PATCH 2/6] E2E Test cloud fns --- .github/workflows/jobs-e2e.yaml | 140 ++++++++++++++ packages/server/__tests__/jobs.e2e.test.ts | 215 +++++++++++++++++++++ packages/server/package.json | 4 + pnpm-lock.yaml | 135 +++++++++++++ 4 files changed, 494 insertions(+) create mode 100644 .github/workflows/jobs-e2e.yaml create mode 100644 packages/server/__tests__/jobs.e2e.test.ts diff --git a/.github/workflows/jobs-e2e.yaml b/.github/workflows/jobs-e2e.yaml new file mode 100644 index 000000000..d6173a78f --- /dev/null +++ b/.github/workflows/jobs-e2e.yaml @@ -0,0 +1,140 @@ +name: Jobs E2E +on: + push: + branches: + - main + - v1 + pull_request: + branches: + - main + - v1 + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-jobs-e2e + cancel-in-progress: true + +jobs: + jobs-e2e: + runs-on: ubuntu-latest + + env: + PGHOST: localhost + PGPORT: 5432 + PGUSER: postgres + PGPASSWORD: password + PGDATABASE: launchql + TEST_DB: launchql + TEST_GRAPHQL_URL: http://127.0.0.1:3000/graphql + TEST_GRAPHQL_HOST: admin.localhost + + services: + pg_db: + image: pyramation/pgvector:13.3-alpine + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Configure Git (for tests) + run: | + git config --global user.name "CI Test User" + git config --global user.email "ci@example.com" + + - name: checkout + uses: actions/checkout@v4 + + - name: checkout constructive-db + uses: actions/checkout@v4 + with: + repository: constructive-io/constructive-db + path: constructive-db + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: '20' + + - name: Setup pnpm + uses: pnpm/action-setup@v2 + with: + version: 10 + + - name: Install dependencies + run: pnpm install + + - name: Build packages + run: pnpm run build + + - name: Setup jobs database + run: | + PGDATABASE=postgres createdb launchql || true + pnpm --filter pgpm exec pgpm admin-users bootstrap --yes --cwd constructive-db + pnpm --filter pgpm exec pgpm admin-users add --test --yes --cwd constructive-db + pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package app-svc-local --cwd constructive-db + pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package metaschema --cwd constructive-db + pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package pgpm-database-jobs --cwd constructive-db + + - name: Resolve database id + run: | + DBID=$(psql -h "$PGHOST" -U "$PGUSER" -d "$PGDATABASE" -Atc "SELECT id FROM metaschema_public.database ORDER BY created_at LIMIT 1;") + if [ -z "$DBID" ]; then + echo "No database id found in metaschema_public.database" >&2 + exit 1 + fi + echo "TEST_DATABASE_ID=$DBID" >> "$GITHUB_ENV" + echo "DEFAULT_DATABASE_ID=$DBID" >> "$GITHUB_ENV" + + - name: Start combined server + env: + NODE_ENV: test + PORT: "3000" + SERVER_HOST: "127.0.0.1" + API_ENABLE_META: "false" + API_EXPOSED_SCHEMAS: "app_jobs,lql_private,lql_public,lql_roles_public,metaschema_modules_public,metaschema_public,services_public" + API_ANON_ROLE: "administrator" + API_ROLE_NAME: "administrator" + API_DEFAULT_DATABASE_ID: ${{ env.DEFAULT_DATABASE_ID }} + CONSTRUCTIVE_GRAPHQL_ENABLED: "true" + CONSTRUCTIVE_JOBS_ENABLED: "true" + CONSTRUCTIVE_FUNCTIONS: "simple-email,send-email-link" + CONSTRUCTIVE_FUNCTION_PORTS: "simple-email:8081,send-email-link:8082" + SIMPLE_EMAIL_DRY_RUN: "true" + SEND_EMAIL_LINK_DRY_RUN: "true" + LOCAL_APP_PORT: "3000" + GRAPHQL_URL: "http://127.0.0.1:3000/graphql" + META_GRAPHQL_URL: "http://127.0.0.1:3000/graphql" + GRAPHQL_HOST_HEADER: "admin.localhost" + META_GRAPHQL_HOST_HEADER: "admin.localhost" + MAILGUN_DOMAIN: "mg.constructive.io" + MAILGUN_FROM: "no-reply@mg.constructive.io" + MAILGUN_REPLY: "info@mg.constructive.io" + MAILGUN_API_KEY: "change-me-mailgun-api-key" + MAILGUN_KEY: "change-me-mailgun-api-key" + JOBS_SUPPORT_ANY: "false" + JOBS_SUPPORTED: "simple-email,send-email-link" + INTERNAL_GATEWAY_DEVELOPMENT_MAP: '{"simple-email":"http://127.0.0.1:8081","send-email-link":"http://127.0.0.1:8082"}' + INTERNAL_JOBS_CALLBACK_PORT: "8080" + JOBS_CALLBACK_BASE_URL: "http://127.0.0.1:8080/callback" + FEATURES_POSTGIS: "false" + run: | + nohup node packages/server/dist/run.js > /tmp/combined-server.log 2>&1 & + echo $! > /tmp/combined-server.pid + + - name: Test server jobs e2e + run: pnpm --filter @constructive-io/server test + + - name: Stop combined server + if: always() + run: | + if [ -f /tmp/combined-server.pid ]; then + kill "$(cat /tmp/combined-server.pid)" || true + fi diff --git a/packages/server/__tests__/jobs.e2e.test.ts b/packages/server/__tests__/jobs.e2e.test.ts new file mode 100644 index 000000000..3a39f207c --- /dev/null +++ b/packages/server/__tests__/jobs.e2e.test.ts @@ -0,0 +1,215 @@ +import supertest from 'supertest'; + +import { getConnections } from '@constructive-io/graphql-test'; + +jest.setTimeout(120000); + +const delay = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + +type GraphqlClient = { + http: ReturnType; + path: string; + host?: string; +}; + +const getGraphqlClient = (): GraphqlClient => { + const rawUrl = + process.env.TEST_GRAPHQL_URL || + process.env.GRAPHQL_URL || + 'http://localhost:3000/graphql'; + const parsed = new URL(rawUrl); + const origin = `${parsed.protocol}//${parsed.host}`; + const path = + parsed.pathname === '/' ? '/graphql' : `${parsed.pathname}${parsed.search}`; + const host = process.env.TEST_GRAPHQL_HOST || process.env.GRAPHQL_HOST; + + return { + http: supertest(origin), + path, + host + }; +}; + +const sendGraphql = async ( + client: GraphqlClient, + query: string, + variables?: Record +) => { + let req = client.http + .post(client.path) + .set('Content-Type', 'application/json'); + if (client.host) { + req = req.set('Host', client.host); + } + return req.send({ query, variables }); +}; + +const addJobMutation = ` + mutation AddJob($input: AddJobInput!) { + addJob(input: $input) { + job { + id + } + } + } +`; + +const jobByIdQuery = ` + query JobById($id: BigInt!) { + job(id: $id) { + id + lastError + attempts + } + } +`; + +const unwrapGraphqlData = ( + response: supertest.Response, + label: string +): T => { + if (response.status !== 200) { + throw new Error(`${label} failed: HTTP ${response.status}`); + } + if (response.body?.errors?.length) { + throw new Error( + `${label} failed: ${response.body.errors + .map((err: { message: string }) => err.message) + .join('; ')}` + ); + } + if (!response.body?.data) { + throw new Error(`${label} returned no data`); + } + return response.body.data as T; +}; + +const getJobById = async ( + client: GraphqlClient, + jobId: string | number +) => { + const response = await sendGraphql(client, jobByIdQuery, { + id: String(jobId) + }); + const data = unwrapGraphqlData<{ job: { lastError?: string | null; attempts?: number } | null }>( + response, + 'Job query' + ); + return data.job; +}; + +const waitForJobCompletion = async ( + client: GraphqlClient, + jobId: string | number +) => { + const timeoutMs = 30000; + const started = Date.now(); + + while (Date.now() - started < timeoutMs) { + const job = await getJobById(client, jobId); + + if (!job) return; + + if (job.lastError) { + const attempts = job.attempts ?? 0; + throw new Error(`Job ${jobId} failed after ${attempts} attempt(s): ${job.lastError}`); + } + + await delay(250); + } + + throw new Error(`Job ${jobId} did not complete within ${timeoutMs}ms`); +}; + +describe('jobs e2e', () => { + let teardown: () => Promise; + let graphqlClient: GraphqlClient; + let databaseId = ''; + let pg: { oneOrNone?: (query: string, values?: unknown[]) => Promise } | undefined; + + beforeAll(async () => { + const targetDb = process.env.TEST_DB || process.env.PGDATABASE; + if (!targetDb) { + throw new Error('TEST_DB or PGDATABASE must point at the jobs database'); + } + process.env.TEST_DB = targetDb; + + ({ teardown, pg } = await getConnections( + { + schemas: ['app_jobs'], + authRole: 'administrator' + } + )); + + graphqlClient = getGraphqlClient(); + databaseId = process.env.TEST_DATABASE_ID ?? ''; + if (!databaseId && pg?.oneOrNone) { + const row = await pg.oneOrNone<{ id: string }>( + 'SELECT id FROM metaschema_public.database ORDER BY created_at LIMIT 1' + ); + databaseId = row?.id ?? ''; + } + if (!databaseId) { + throw new Error('TEST_DATABASE_ID is required or metaschema_public.database must contain a row'); + } + process.env.TEST_DATABASE_ID = databaseId; + }); + + afterAll(async () => { + if (teardown) { + await teardown(); + } + }); + + it('creates and processes a simple-email job', async () => { + const jobInput = { + dbId: databaseId, + identifier: 'simple-email', + payload: { + to: 'user@example.com', + subject: 'Jobs e2e', + html: '

jobs test

' + } + }; + + const response = await sendGraphql(graphqlClient, addJobMutation, { + input: jobInput + }); + + expect(response.status).toBe(200); + expect(response.body?.errors).toBeUndefined(); + + const jobId = response.body?.data?.addJob?.job?.id; + + expect(jobId).toBeTruthy(); + + await waitForJobCompletion(graphqlClient, jobId); + }); + + it('creates and processes a send-email-link job', async () => { + const jobInput = { + dbId: databaseId, + identifier: 'send-email-link', + payload: { + email_type: 'invite_email', + email: 'user@example.com', + invite_token: 'invite123', + sender_id: '00000000-0000-0000-0000-000000000000' + } + }; + + const response = await sendGraphql(graphqlClient, addJobMutation, { + input: jobInput + }); + + expect(response.status).toBe(200); + expect(response.body?.errors).toBeUndefined(); + + const jobId = response.body?.data?.addJob?.job?.id; + + expect(jobId).toBeTruthy(); + + await waitForJobCompletion(graphqlClient, jobId); + }); +}); diff --git a/packages/server/package.json b/packages/server/package.json index c54683654..9311cb050 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -49,8 +49,12 @@ "@pgpmjs/logger": "workspace:^" }, "devDependencies": { + "@constructive-io/graphql-test": "workspace:^", + "@types/supertest": "^6.0.3", "makage": "^0.1.10", "nodemon": "^3.1.10", + "pgsql-test": "workspace:^", + "supertest": "^7.2.2", "ts-node": "^10.9.2" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index df791ed8d..c4a922126 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1413,12 +1413,24 @@ importers: specifier: workspace:^ version: link:../../pgpm/logger/dist devDependencies: + '@constructive-io/graphql-test': + specifier: workspace:^ + version: link:../../graphql/test/dist + '@types/supertest': + specifier: ^6.0.3 + version: 6.0.3 makage: specifier: ^0.1.10 version: 0.1.10 nodemon: specifier: ^3.1.10 version: 3.1.11 + pgsql-test: + specifier: workspace:^ + version: link:../../postgres/pgsql-test/dist + supertest: + specifier: ^7.2.2 + version: 7.2.2 ts-node: specifier: ^10.9.2 version: 10.9.2(@types/node@20.19.27)(typescript@5.9.3) @@ -2781,6 +2793,10 @@ packages: '@napi-rs/wasm-runtime@0.2.4': resolution: {integrity: sha512-9zESzOO5aDByvhIAsOy9TbpZ0Ur2AJbUI7UT73kcUTS2mxAMHOBaa1st/jAymNoCtvrit99kkzT1FZuXVcgfIQ==} + '@noble/hashes@1.8.0': + resolution: {integrity: sha512-jCs9ldd7NwzpgXDIf6P3+NrHh9/sD6CQdxHyjQI+h/6rDNo88ypBxxz45UDuZHz9r3tNz7N/VInSVoVdtXEI4A==} + engines: {node: ^14.21.3 || >=16} + '@nodelib/fs.scandir@2.1.5': resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==} engines: {node: '>= 8'} @@ -2974,6 +2990,9 @@ packages: '@one-ini/wasm@0.1.1': resolution: {integrity: sha512-XuySG1E38YScSJoMlqovLru4KTUNSjgVTIjyh7qMX6aNN5HY5Ct5LhRJdxO79JtTzKfzV/bnWpz+zquYrISsvw==} + '@paralleldrive/cuid2@2.3.1': + resolution: {integrity: sha512-XO7cAxhnTZl0Yggq6jOgjiOHhbgcO4NqFqwSmQpjK3b6TEE6Uj/jfSk6wzYyemh3+I0sHirKSetjQwn5cZktFw==} + '@pgsql/types@17.6.2': resolution: {integrity: sha512-1UtbELdbqNdyOShhrVfSz3a1gDi0s9XXiQemx+6QqtsrXe62a6zOGU+vjb2GRfG5jeEokI1zBBcfD42enRv0Rw==} @@ -3394,6 +3413,9 @@ packages: '@types/content-disposition@0.5.9': resolution: {integrity: sha512-8uYXI3Gw35MhiVYhG3s295oihrxRyytcRHjSjqnqZVDDy/xcGBRny7+Xj1Wgfhv5QzRtN2hB2dVRBUX9XW3UcQ==} + '@types/cookiejar@2.1.5': + resolution: {integrity: sha512-he+DHOWReW0nghN24E1WUqM0efK4kI9oTqDm6XmK8ZPe2djZ90BSNdGnIyCLzCPw7/pogPlGbzI2wHGGmi4O/Q==} + '@types/cookies@0.9.2': resolution: {integrity: sha512-1AvkDdZM2dbyFybL4fxpuNCaWyv//0AwsuUk2DWeXyM1/5ZKm6W3z6mQi24RZ4l2ucY+bkSHzbDVpySqPGuV8A==} @@ -3466,6 +3488,9 @@ packages: '@types/koa@3.0.1': resolution: {integrity: sha512-VkB6WJUQSe0zBpR+Q7/YIUESGp5wPHcaXr0xueU5W0EOUWtlSbblsl+Kl31lyRQ63nIILh0e/7gXjQ09JXJIHw==} + '@types/methods@1.1.4': + resolution: {integrity: sha512-ymXWVrDiCxTBE3+RIrrP533E70eA+9qu7zdWoHuOmGujkYtzf4HQF96b8nwHLqhuf4ykX61IGRIB38CC6/sImQ==} + '@types/minimatch@3.0.5': resolution: {integrity: sha512-Klz949h02Gz2uZCMGwDUSDS1YBlTdDDgbWHi+81l29tQALUtvz4rAYi5uoVhE5Lagoq6DeqAUlbrHvW/mXDgdQ==} @@ -3517,6 +3542,12 @@ packages: '@types/stack-utils@2.0.3': resolution: {integrity: sha512-9aEbYZ3TbYMznPdcdr3SmIrLXwC/AKZXQeCf9Pgao5CKb8CyHuEX5jzWPTkvregvhRJHcpRO6BFoGW9ycaOkYw==} + '@types/superagent@8.1.9': + resolution: {integrity: sha512-pTVjI73witn+9ILmoJdajHGW2jkSaOzhiFYF1Rd3EQ94kymLqB9PjD9ISg7WaALC7+dCHT0FGe9T2LktLq/3GQ==} + + '@types/supertest@6.0.3': + resolution: {integrity: sha512-8WzXq62EXFhJ7QsH3Ocb/iKQ/Ty9ZVWnVzoTKc9tyyFRRF3a74Tk2+TLFgaFFw364Ere+npzHKEJ6ga2LzIL7w==} + '@types/testing-library__jest-dom@5.14.9': resolution: {integrity: sha512-FSYhIjFlfOpGSRyVoMBMuS3ws5ehFQODymf3vlI7U1K8c7PHwWwFY7VREfmsuzHSOnoKs/9/Y983ayOs7eRzqw==} @@ -3831,6 +3862,9 @@ packages: resolution: {integrity: sha512-3duEwti880xqi4eAMN8AyR4a0ByT90zoYdLlevfrvU43vb0YZwZVfxOgxWrLXXXpyugL0hNZc9G6BiB5B3nUug==} engines: {node: '>=8'} + asap@2.0.6: + resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==} + asn1@0.2.6: resolution: {integrity: sha512-ix/FxPn0MDjeyJ7i/yoHGFt/EX6LyNbxSEhPPXODPL+KB0VPk86UYfL0lMdy+KCnv+fmvIzySwaK5COwqVbWTQ==} @@ -4237,6 +4271,9 @@ packages: compare-func@2.0.0: resolution: {integrity: sha512-zHig5N+tPWARooBnb0Zx1MFcdfpyJrfTJ3Y5L+IFvUm8rM74hHz66z0gw0x4tijh5CorKkKUCnW82R2vmpeCRA==} + component-emitter@1.3.1: + resolution: {integrity: sha512-T0+barUSQRTUQASh8bx02dl+DhF54GtIDY13Y3m9oWTklKbb3Wv974meRpeZ3lp1JpLVECWWNHC4vaG2XHXouQ==} + concat-map@0.0.1: resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==} @@ -4300,6 +4337,9 @@ packages: resolution: {integrity: sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w==} engines: {node: '>= 0.6'} + cookiejar@2.1.4: + resolution: {integrity: sha512-LDx6oHrK+PhzLKJU9j5S7/Y3jM/mUHvD/DeI1WQmJn652iPC5Y4TBzC9l+5OMOXlyTTA+SmVUPm0HQUwpD5Jqw==} + copyfiles@2.4.1: resolution: {integrity: sha512-fereAvAvxDrQDOXybk3Qu3dPbOoKoysFMWtkY3mv5BsL8//OSZVL5DCLYqgRfY5cWirgRzlC+WSrxp6Bo3eNZg==} hasBin: true @@ -4517,6 +4557,9 @@ packages: detect-node@2.1.0: resolution: {integrity: sha512-T0NIuQpnTvFDATNuHN5roPwSBG83rFsuO+MXXH9/3N1eFbn4wcPjttvjMLEPWJ0RGUYgQE7cGgS3tNxbqCGM7g==} + dezalgo@1.0.4: + resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==} + dicer@0.3.0: resolution: {integrity: sha512-MdceRRWqltEG2dZqO769g27N/3PXfcKl04VhYnBlo2YhH7zPi88VebsjTKclaOyiuMaGU72hTfw3VkUitGcVCA==} engines: {node: '>=4.5.0'} @@ -4969,6 +5012,9 @@ packages: fast-levenshtein@2.0.6: resolution: {integrity: sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==} + fast-safe-stringify@2.1.1: + resolution: {integrity: sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA==} + fast-uri@3.1.0: resolution: {integrity: sha512-iPeeDKJSWf4IEOasVVrknXpaBV0IApz/gp7S2bb7Z4Lljbl2MGJRqInZiUrQwV16cpzw/D3S5j5Julj/gT52AA==} @@ -5071,6 +5117,10 @@ packages: resolution: {integrity: sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==} engines: {node: '>= 6'} + formidable@3.5.4: + resolution: {integrity: sha512-YikH+7CUTOtP44ZTnUhR7Ic2UASBPOqmaRkRKxRbywPTe5VxF7RRCck4af9wutiZ/QKM5nME9Bie2fFaPz5Gug==} + engines: {node: '>=14.0.0'} + forwarded@0.2.0: resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==} engines: {node: '>= 0.6'} @@ -6237,6 +6287,10 @@ packages: resolution: {integrity: sha512-8q7VEgMJW4J8tcfVPy8g09NcQwZdbwFEqhe/WZkoIzjn/3TGDwtOCYtXGxA3O8tPzpczCCDgv+P2P5y00ZJOOg==} engines: {node: '>= 8'} + methods@1.1.2: + resolution: {integrity: sha512-iclAHeNqNm68zFtnZ0e+1L2yUIdvzNoauKU4WBA3VvH/vPFieF7qfRlwUZU+DA9P9bPXIS90ulxoUoCH23sV2w==} + engines: {node: '>= 0.6'} + micromatch@4.0.8: resolution: {integrity: sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==} engines: {node: '>=8.6'} @@ -7094,6 +7148,10 @@ packages: resolution: {integrity: sha512-YWWTjgABSKcvs/nWBi9PycY/JiPJqOD4JA6o9Sej2AtvSGarXxKC3OQSk4pAarbdQlKAh5D4FCQkJNkW+GAn3w==} engines: {node: '>=0.6'} + qs@6.14.1: + resolution: {integrity: sha512-4EK3+xJl8Ts67nLYNwqw/dsFVnCf+qR7RgXSK9jEEm9unao3njwMDdmsdvoKBKHzxd7tCYz5e5M+SnMjdtXGQQ==} + engines: {node: '>=0.6'} + qs@6.5.3: resolution: {integrity: sha512-qxXIEh4pCGfHICj1mAJQ2/2XVZkjCDTcEgfoSQxc/fYivUZxTkk7L3bDBJSoNrEzXI17oUO5Dp07ktqE5KzczA==} engines: {node: '>=0.6'} @@ -7559,6 +7617,14 @@ packages: peerDependencies: graphql: '>=0.10.0' + superagent@10.3.0: + resolution: {integrity: sha512-B+4Ik7ROgVKrQsXTV0Jwp2u+PXYLSlqtDAhYnkkD+zn3yg8s/zjA2MeGayPoY/KICrbitwneDHrjSotxKL+0XQ==} + engines: {node: '>=14.18.0'} + + supertest@7.2.2: + resolution: {integrity: sha512-oK8WG9diS3DlhdUkcFn4tkNIiIbBx9lI2ClF8K+b2/m8Eyv47LSawxUzZQSNKUrVb2KsqeTDCcjAAVPYaSLVTA==} + engines: {node: '>=14.18.0'} + supports-color@5.5.0: resolution: {integrity: sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==} engines: {node: '>=4'} @@ -9426,6 +9492,8 @@ snapshots: '@emnapi/runtime': 1.7.1 '@tybys/wasm-util': 0.9.0 + '@noble/hashes@1.8.0': {} + '@nodelib/fs.scandir@2.1.5': dependencies: '@nodelib/fs.stat': 2.0.5 @@ -9677,6 +9745,10 @@ snapshots: '@one-ini/wasm@0.1.1': {} + '@paralleldrive/cuid2@2.3.1': + dependencies: + '@noble/hashes': 1.8.0 + '@pgsql/types@17.6.2': {} '@pgsql/utils@17.8.9': @@ -10255,6 +10327,8 @@ snapshots: '@types/content-disposition@0.5.9': {} + '@types/cookiejar@2.1.5': {} + '@types/cookies@0.9.2': dependencies: '@types/connect': 3.4.38 @@ -10353,6 +10427,8 @@ snapshots: '@types/koa-compose': 3.2.9 '@types/node': 20.19.27 + '@types/methods@1.1.4': {} + '@types/minimatch@3.0.5': {} '@types/minimist@1.2.5': {} @@ -10410,6 +10486,18 @@ snapshots: '@types/stack-utils@2.0.3': {} + '@types/superagent@8.1.9': + dependencies: + '@types/cookiejar': 2.1.5 + '@types/methods': 1.1.4 + '@types/node': 20.19.27 + form-data: 4.0.5 + + '@types/supertest@6.0.3': + dependencies: + '@types/methods': 1.1.4 + '@types/superagent': 8.1.9 + '@types/testing-library__jest-dom@5.14.9': dependencies: '@types/jest': 30.0.0 @@ -10707,6 +10795,8 @@ snapshots: arrify@2.0.1: {} + asap@2.0.6: {} + asn1@0.2.6: dependencies: safer-buffer: 2.1.2 @@ -11206,6 +11296,8 @@ snapshots: array-ify: 1.0.0 dot-prop: 5.3.0 + component-emitter@1.3.1: {} + concat-map@0.0.1: {} concat-stream@2.0.0: @@ -11284,6 +11376,8 @@ snapshots: cookie@0.7.2: {} + cookiejar@2.1.4: {} + copyfiles@2.4.1: dependencies: glob: 7.2.3 @@ -11475,6 +11569,11 @@ snapshots: detect-node@2.1.0: {} + dezalgo@1.0.4: + dependencies: + asap: 2.0.6 + wrappy: 1.0.2 + dicer@0.3.0: dependencies: streamsearch: 0.1.2 @@ -11889,6 +11988,8 @@ snapshots: fast-levenshtein@2.0.6: {} + fast-safe-stringify@2.1.1: {} + fast-uri@3.1.0: {} fast-xml-parser@5.2.5: @@ -12005,6 +12106,12 @@ snapshots: hasown: 2.0.2 mime-types: 2.1.35 + formidable@3.5.4: + dependencies: + '@paralleldrive/cuid2': 2.3.1 + dezalgo: 1.0.4 + once: 1.4.0 + forwarded@0.2.0: {} fresh@2.0.0: {} @@ -13713,6 +13820,8 @@ snapshots: merge2@1.4.1: {} + methods@1.1.2: {} + micromatch@4.0.8: dependencies: braces: 3.0.3 @@ -14865,6 +14974,10 @@ snapshots: dependencies: side-channel: 1.1.0 + qs@6.14.1: + dependencies: + side-channel: 1.1.0 + qs@6.5.3: {} qs@6.7.0: {} @@ -15405,6 +15518,28 @@ snapshots: - bufferutil - utf-8-validate + superagent@10.3.0: + dependencies: + component-emitter: 1.3.1 + cookiejar: 2.1.4 + debug: 4.4.3(supports-color@5.5.0) + fast-safe-stringify: 2.1.1 + form-data: 4.0.5 + formidable: 3.5.4 + methods: 1.1.2 + mime: 2.6.0 + qs: 6.14.1 + transitivePeerDependencies: + - supports-color + + supertest@7.2.2: + dependencies: + cookie-signature: 1.2.2 + methods: 1.1.2 + superagent: 10.3.0 + transitivePeerDependencies: + - supports-color + supports-color@5.5.0: dependencies: has-flag: 3.0.0 From 6fe4a31d618db3d478ad9c73e258a2d575b07585 Mon Sep 17 00:00:00 2001 From: zetazzz Date: Thu, 8 Jan 2026 15:17:45 +0700 Subject: [PATCH 3/6] refactor test on combined server --- functions/send-email-link/src/index.ts | 3 +- functions/simple-email/src/index.ts | 3 +- graphql/server/src/server.ts | 102 +++++- jobs/job-scheduler/src/index.ts | 71 +++- jobs/knative-job-example/src/index.ts | 4 +- jobs/knative-job-fn/src/index.ts | 272 ++++++++------- jobs/knative-job-worker/src/index.ts | 68 +++- packages/cli/src/commands/jobs.ts | 3 +- packages/server/README.md | 5 +- packages/server/__fixtures__/jobs.seed.sql | 67 ++++ packages/server/__tests__/jobs.e2e.test.ts | 384 +++++++++++++++++++-- packages/server/package.json | 13 + packages/server/src/run.ts | 6 +- packages/server/src/server.ts | 198 +++++++++-- pnpm-lock.yaml | 92 +++++ 15 files changed, 1081 insertions(+), 210 deletions(-) create mode 100644 packages/server/__fixtures__/jobs.seed.sql diff --git a/functions/send-email-link/src/index.ts b/functions/send-email-link/src/index.ts index f7b7e61ce..886ab0e27 100644 --- a/functions/send-email-link/src/index.ts +++ b/functions/send-email-link/src/index.ts @@ -1,4 +1,4 @@ -import app from '@constructive-io/knative-job-fn'; +import { createJobApp } from '@constructive-io/knative-job-fn'; import { GraphQLClient } from 'graphql-request'; import gql from 'graphql-tag'; import { generate } from '@launchql/mjml'; @@ -6,6 +6,7 @@ import { send } from '@launchql/postmaster'; import { parseEnvBoolean } from '@pgpmjs/env'; const isDryRun = parseEnvBoolean(process.env.SEND_EMAIL_LINK_DRY_RUN) ?? false; +const app = createJobApp(); const GetUser = gql` query GetUser($userId: UUID!) { diff --git a/functions/simple-email/src/index.ts b/functions/simple-email/src/index.ts index 736423e0a..2227fbd34 100644 --- a/functions/simple-email/src/index.ts +++ b/functions/simple-email/src/index.ts @@ -1,4 +1,4 @@ -import app from '@constructive-io/knative-job-fn'; +import { createJobApp } from '@constructive-io/knative-job-fn'; import { parseEnvBoolean } from '@pgpmjs/env'; import { send as sendEmail } from '@launchql/postmaster'; @@ -26,6 +26,7 @@ const getRequiredField = ( }; const isDryRun = parseEnvBoolean(process.env.SIMPLE_EMAIL_DRY_RUN) ?? false; +const app = createJobApp(); app.post('/', async (req: any, res: any, next: any) => { try { diff --git a/graphql/server/src/server.ts b/graphql/server/src/server.ts index 70a234b75..db89c9827 100644 --- a/graphql/server/src/server.ts +++ b/graphql/server/src/server.ts @@ -1,13 +1,15 @@ import { getEnvOptions, getNodeEnv } from '@constructive-io/graphql-env'; import { Logger } from '@pgpmjs/logger'; -import { healthz, poweredBy, trustProxy } from '@pgpmjs/server-utils'; +import { healthz, poweredBy, svcCache, trustProxy } from '@pgpmjs/server-utils'; import { PgpmOptions } from '@pgpmjs/types'; import { middleware as parseDomains } from '@constructive-io/url-domains'; import express, { Express, RequestHandler } from 'express'; +import type { Server as HttpServer } from 'http'; // @ts-ignore import graphqlUpload from 'graphql-upload'; import { Pool, PoolClient } from 'pg'; -import { getPgPool } from 'pg-cache'; +import { graphileCache } from 'graphile-cache'; +import { getPgPool, pgCache } from 'pg-cache'; import requestIp from 'request-ip'; import { createApiMiddleware } from './middleware/api'; @@ -29,28 +31,34 @@ export const GraphQLServer = (rawOpts: PgpmOptions = {}) => { class Server { private app: Express; private opts: PgpmOptions; + private listenClient: PoolClient | null = null; + private listenRelease: (() => void) | null = null; + private shuttingDown = false; + private closed = false; + private httpServer: HttpServer | null = null; constructor(opts: PgpmOptions) { this.opts = getEnvOptions(opts); + const effectiveOpts = this.opts; const app = express(); - const api = createApiMiddleware(opts); - const authenticate = createAuthenticateMiddleware(opts); + const api = createApiMiddleware(effectiveOpts); + const authenticate = createAuthenticateMiddleware(effectiveOpts); // Log startup config in dev mode if (isDev()) { log.debug( - `Database: ${opts.pg?.database}@${opts.pg?.host}:${opts.pg?.port}` + `Database: ${effectiveOpts.pg?.database}@${effectiveOpts.pg?.host}:${effectiveOpts.pg?.port}` ); log.debug( - `Meta schemas: ${(opts as any).api?.metaSchemas?.join(', ') || 'default'}` + `Meta schemas: ${(effectiveOpts as any).api?.metaSchemas?.join(', ') || 'default'}` ); } healthz(app); - trustProxy(app, opts.server.trustProxy); + trustProxy(app, effectiveOpts.server.trustProxy); // Warn if a global CORS override is set in production - const fallbackOrigin = opts.server?.origin?.trim(); + const fallbackOrigin = effectiveOpts.server?.origin?.trim(); if (fallbackOrigin && process.env.NODE_ENV === 'production') { if (fallbackOrigin === '*') { log.warn( @@ -70,13 +78,13 @@ class Server { app.use(requestIp.mw()); app.use(api); app.use(authenticate); - app.use(graphile(opts)); + app.use(graphile(effectiveOpts)); app.use(flush); this.app = app; } - listen(): void { + listen(): HttpServer { const { server } = this.opts; const httpServer = this.app.listen(server?.port, server?.host, () => log.info(`listening at http://${server?.host}:${server?.port}`) @@ -90,6 +98,9 @@ class Server { } throw err; }); + + this.httpServer = httpServer; + return httpServer; } async flush(databaseId: string): Promise { @@ -101,6 +112,7 @@ class Server { } addEventListener(): void { + if (this.shuttingDown) return; const pgPool = this.getPool(); pgPool.connect(this.listenForChanges.bind(this)); } @@ -112,10 +124,20 @@ class Server { ): void { if (err) { this.error('Error connecting with notify listener', err); - setTimeout(() => this.addEventListener(), 5000); + if (!this.shuttingDown) { + setTimeout(() => this.addEventListener(), 5000); + } return; } + if (this.shuttingDown) { + release(); + return; + } + + this.listenClient = client; + this.listenRelease = release; + client.on('notification', ({ channel, payload }) => { if (channel === 'schema:update' && payload) { log.info('schema:update', payload); @@ -126,6 +148,10 @@ class Server { client.query('LISTEN "schema:update"'); client.on('error', (e) => { + if (this.shuttingDown) { + release(); + return; + } this.error('Error with database notify listener', e); release(); this.addEventListener(); @@ -134,6 +160,60 @@ class Server { this.log('connected and listening for changes...'); } + async removeEventListener(): Promise { + if (!this.listenClient || !this.listenRelease) { + return; + } + + const client = this.listenClient; + const release = this.listenRelease; + this.listenClient = null; + this.listenRelease = null; + + client.removeAllListeners('notification'); + client.removeAllListeners('error'); + + try { + await client.query('UNLISTEN "schema:update"'); + } catch { + // Ignore listener cleanup errors during shutdown. + } + + release(); + } + + async close(opts: { closeCaches?: boolean } = {}): Promise { + const { closeCaches = false } = opts; + if (this.closed) { + if (closeCaches) { + await Server.closeCaches({ closePools: true }); + } + return; + } + this.closed = true; + this.shuttingDown = true; + await this.removeEventListener(); + if (this.httpServer?.listening) { + await new Promise((resolve) => + this.httpServer!.close(() => resolve()) + ); + } + if (closeCaches) { + await Server.closeCaches({ closePools: true }); + } + } + + static async closeCaches( + opts: { closePools?: boolean } = {} + ): Promise { + const { closePools = false } = opts; + svcCache.clear(); + graphileCache.clear(); + if (closePools) { + await pgCache.close(); + } + } + log(text: string): void { log.info(text); } diff --git a/jobs/job-scheduler/src/index.ts b/jobs/job-scheduler/src/index.ts index a3d8df160..a051a16ac 100644 --- a/jobs/job-scheduler/src/index.ts +++ b/jobs/job-scheduler/src/index.ts @@ -25,6 +25,9 @@ export default class Scheduler { pgPool: Pool; jobs: Record; _initialized?: boolean; + listenClient?: PoolClient; + listenRelease?: () => void; + stopped?: boolean; constructor({ tasks, @@ -135,6 +138,7 @@ export default class Scheduler { this.jobs[id] = j as SchedulerJobHandle; } async doNext(client: PgClientLike): Promise { + if (this.stopped) return; if (!this._initialized) { return await this.initialize(client); } @@ -151,10 +155,12 @@ export default class Scheduler { : this.supportedTaskNames }); if (!job || !job.id) { - this.doNextTimer = setTimeout( - () => this.doNext(client), - this.idleDelay - ); + if (!this.stopped) { + this.doNextTimer = setTimeout( + () => this.doNext(client), + this.idleDelay + ); + } return; } const start = process.hrtime(); @@ -180,12 +186,21 @@ export default class Scheduler { } catch (fatalError: unknown) { await this.handleFatalError(client, { err, fatalError, jobId }); } - return this.doNext(client); + if (!this.stopped) { + return this.doNext(client); + } + return; } catch (err: unknown) { - this.doNextTimer = setTimeout(() => this.doNext(client), this.idleDelay); + if (!this.stopped) { + this.doNextTimer = setTimeout( + () => this.doNext(client), + this.idleDelay + ); + } } } listen() { + if (this.stopped) return; const listenForChanges = ( err: Error | null, client: PoolClient, @@ -198,9 +213,17 @@ export default class Scheduler { } // Try again in 5 seconds // should this really be done in the node process? - setTimeout(this.listen, 5000); + if (!this.stopped) { + setTimeout(this.listen, 5000); + } + return; + } + if (this.stopped) { + release(); return; } + this.listenClient = client; + this.listenRelease = release; client.on('notification', () => { log.info('a NEW scheduled JOB!'); if (this.doNextTimer) { @@ -210,12 +233,18 @@ export default class Scheduler { }); client.query('LISTEN "scheduled_jobs:insert"'); client.on('error', (e: unknown) => { + if (this.stopped) { + release(); + return; + } log.error('Error with database notify listener', e); if (e instanceof Error && e.stack) { log.debug(e.stack); } release(); - this.listen(); + if (!this.stopped) { + this.listen(); + } }); log.info( `${this.workerId} connected and looking for scheduled jobs...` @@ -224,6 +253,32 @@ export default class Scheduler { }; this.pgPool.connect(listenForChanges); } + + async stop(): Promise { + this.stopped = true; + if (this.doNextTimer) { + clearTimeout(this.doNextTimer); + this.doNextTimer = undefined; + } + Object.values(this.jobs).forEach((job) => job.cancel()); + this.jobs = {}; + + const client = this.listenClient; + const release = this.listenRelease; + this.listenClient = undefined; + this.listenRelease = undefined; + + if (client && release) { + client.removeAllListeners('notification'); + client.removeAllListeners('error'); + try { + await client.query('UNLISTEN "scheduled_jobs:insert"'); + } catch { + // Ignore listener cleanup errors during shutdown. + } + release(); + } + } } export { Scheduler }; diff --git a/jobs/knative-job-example/src/index.ts b/jobs/knative-job-example/src/index.ts index 5f8729847..84303ab19 100644 --- a/jobs/knative-job-example/src/index.ts +++ b/jobs/knative-job-example/src/index.ts @@ -1,4 +1,6 @@ -import app from '@constructive-io/knative-job-fn'; +import { createJobApp } from '@constructive-io/knative-job-fn'; + +const app = createJobApp(); app.post('/', async (req: any, res: any, next: any) => { if (req.body.throw) { diff --git a/jobs/knative-job-fn/src/index.ts b/jobs/knative-job-fn/src/index.ts index ff09e1d88..5e313e550 100644 --- a/jobs/knative-job-fn/src/index.ts +++ b/jobs/knative-job-fn/src/index.ts @@ -3,6 +3,7 @@ import bodyParser from 'body-parser'; import http from 'node:http'; import https from 'node:https'; import { URL } from 'node:url'; +import type { Server as HttpServer } from 'http'; type JobCallbackStatus = 'success' | 'error'; @@ -22,51 +23,6 @@ function getHeaders(req: any) { }; } -const app: any = express(); - -app.use(bodyParser.json()); - -// Basic request logging for all incoming job invocations. -app.use((req: any, res: any, next: any) => { - try { - // Log only the headers we care about plus a shallow body snapshot - const headers = getHeaders(req); - - let body: any; - if (req.body && typeof req.body === 'object') { - // Only log top-level keys to avoid exposing sensitive body contents. - body = { keys: Object.keys(req.body) }; - } else if (typeof req.body === 'string') { - // For string bodies, log only the length. - body = { length: req.body.length }; - } else { - body = undefined; - } - - // eslint-disable-next-line no-console - console.log('[knative-job-fn] Incoming job request', { - method: req.method, - path: req.originalUrl || req.url, - headers, - body - }); - } catch { - // best-effort logging; never block the request - } - next(); -}); - -// Echo job headers back on responses for debugging/traceability. -app.use((req: any, res: any, next: any) => { - res.set({ - 'Content-Type': 'application/json', - 'X-Worker-Id': req.get('X-Worker-Id'), - 'X-Database-Id': req.get('X-Database-Id'), - 'X-Job-Id': req.get('X-Job-Id') - }); - next(); -}); - // Normalize callback URL so it always points at the /callback endpoint. const normalizeCallbackUrl = (rawUrl: string): string => { try { @@ -171,98 +127,164 @@ const sendJobCallback = async ( } }; -// Attach per-request context and a finish hook to send success callbacks. -app.use((req: any, res: any, next: any) => { - const ctx: JobContext = { - callbackUrl: req.get('X-Callback-Url'), - workerId: req.get('X-Worker-Id'), - jobId: req.get('X-Job-Id'), - databaseId: req.get('X-Database-Id') - }; +const createJobApp = () => { + const app: any = express(); + + app.use(bodyParser.json()); - // Store on res.locals so the error middleware can also mark callbacks as sent. - res.locals = res.locals || {}; - res.locals.jobContext = ctx; - res.locals.jobCallbackSent = false; + // Basic request logging for all incoming job invocations. + app.use((req: any, res: any, next: any) => { + try { + // Log only the headers we care about plus a shallow body snapshot + const headers = getHeaders(req); + + let body: any; + if (req.body && typeof req.body === 'object') { + // Only log top-level keys to avoid exposing sensitive body contents. + body = { keys: Object.keys(req.body) }; + } else if (typeof req.body === 'string') { + // For string bodies, log only the length. + body = { length: req.body.length }; + } else { + body = undefined; + } - if (ctx.callbackUrl && ctx.workerId && ctx.jobId) { - res.on('finish', () => { - // If an error handler already sent a callback, skip. - if (res.locals.jobCallbackSent) return; - res.locals.jobCallbackSent = true; // eslint-disable-next-line no-console - console.log('[knative-job-fn] Function completed', { - workerId: ctx.workerId, - jobId: ctx.jobId, - databaseId: ctx.databaseId, - statusCode: res.statusCode + console.log('[knative-job-fn] Incoming job request', { + method: req.method, + path: req.originalUrl || req.url, + headers, + body }); - void sendJobCallback(ctx, 'success'); + } catch { + // best-effort logging; never block the request + } + next(); + }); + + // Echo job headers back on responses for debugging/traceability. + app.use((req: any, res: any, next: any) => { + res.set({ + 'Content-Type': 'application/json', + 'X-Worker-Id': req.get('X-Worker-Id'), + 'X-Database-Id': req.get('X-Database-Id'), + 'X-Job-Id': req.get('X-Job-Id') }); - } + next(); + }); - next(); -}); - -export default { - post: function (...args: any[]) { - return app.post.apply(app, args as any); - }, - listen: (port: any, cb: () => void = () => {}) => { - // NOTE Remember that Express middleware executes in order. - // You should define error handlers last, after all other middleware. - // Otherwise, your error handler won't get called - // eslint-disable-next-line no-unused-vars - app.use(async (error: any, req: any, res: any, next: any) => { - res.set({ - 'Content-Type': 'application/json', - 'X-Job-Error': true + // Attach per-request context and a finish hook to send success callbacks. + app.use((req: any, res: any, next: any) => { + const ctx: JobContext = { + callbackUrl: req.get('X-Callback-Url'), + workerId: req.get('X-Worker-Id'), + jobId: req.get('X-Job-Id'), + databaseId: req.get('X-Database-Id') + }; + + // Store on res.locals so the error middleware can also mark callbacks as sent. + res.locals = res.locals || {}; + res.locals.jobContext = ctx; + res.locals.jobCallbackSent = false; + + if (ctx.callbackUrl && ctx.workerId && ctx.jobId) { + res.on('finish', () => { + // If an error handler already sent a callback, skip. + if (res.locals.jobCallbackSent) return; + res.locals.jobCallbackSent = true; + // eslint-disable-next-line no-console + console.log('[knative-job-fn] Function completed', { + workerId: ctx.workerId, + jobId: ctx.jobId, + databaseId: ctx.databaseId, + statusCode: res.statusCode + }); + void sendJobCallback(ctx, 'success'); }); + } + + next(); + }); - // Mark job as having errored via callback, if available. - try { - const ctx: JobContext | undefined = res.locals?.jobContext; - if (ctx && !res.locals.jobCallbackSent) { - res.locals.jobCallbackSent = true; - await sendJobCallback(ctx, 'error', error?.message); + return { + post: function (...args: any[]) { + return app.post.apply(app, args as any); + }, + listen: ( + port: any, + hostOrCb?: string | (() => void), + cb: () => void = () => {} + ): HttpServer => { + // NOTE Remember that Express middleware executes in order. + // You should define error handlers last, after all other middleware. + // Otherwise, your error handler won't get called + // eslint-disable-next-line no-unused-vars + app.use(async (error: any, req: any, res: any, next: any) => { + res.set({ + 'Content-Type': 'application/json', + 'X-Job-Error': true + }); + + // Mark job as having errored via callback, if available. + try { + const ctx: JobContext | undefined = res.locals?.jobContext; + if (ctx && !res.locals.jobCallbackSent) { + res.locals.jobCallbackSent = true; + await sendJobCallback(ctx, 'error', error?.message); + } + } catch (err) { + // eslint-disable-next-line no-console + console.error('[knative-job-fn] Failed to send error callback', err); } - } catch (err) { - // eslint-disable-next-line no-console - console.error('[knative-job-fn] Failed to send error callback', err); - } - // Log the full error context for debugging. - try { - const headers = getHeaders(req); - - // Some error types (e.g. GraphQL ClientError) expose response info. - const errorDetails: any = { - message: error?.message, - name: error?.name, - stack: error?.stack - }; - - if (error?.response) { - errorDetails.response = { - status: error.response.status, - statusText: error.response.statusText, - errors: error.response.errors, - data: error.response.data + // Log the full error context for debugging. + try { + const headers = getHeaders(req); + + // Some error types (e.g. GraphQL ClientError) expose response info. + const errorDetails: any = { + message: error?.message, + name: error?.name, + stack: error?.stack }; + + if (error?.response) { + errorDetails.response = { + status: error.response.status, + statusText: error.response.statusText, + errors: error.response.errors, + data: error.response.data + }; + } + + // eslint-disable-next-line no-console + console.error('[knative-job-fn] Function error', { + headers, + path: req.originalUrl || req.url, + error: errorDetails + }); + } catch { + // never throw from the error logger } - // eslint-disable-next-line no-console - console.error('[knative-job-fn] Function error', { - headers, - path: req.originalUrl || req.url, - error: errorDetails - }); - } catch { - // never throw from the error logger - } + res.status(200).json({ message: error.message }); + }); - res.status(200).json({ message: error.message }); - }); - return app.listen(port, cb); - } + const host = typeof hostOrCb === 'string' ? hostOrCb : undefined; + const callback = typeof hostOrCb === 'function' ? hostOrCb : cb; + const onListen = () => { + callback(); + }; + const server = host + ? app.listen(port, host, onListen) + : app.listen(port, onListen); + + return server; + } + }; }; + +const defaultApp = createJobApp(); + +export { createJobApp }; +export default defaultApp; diff --git a/jobs/knative-job-worker/src/index.ts b/jobs/knative-job-worker/src/index.ts index 6599aab29..5a54e6ad8 100644 --- a/jobs/knative-job-worker/src/index.ts +++ b/jobs/knative-job-worker/src/index.ts @@ -21,6 +21,9 @@ export default class Worker { doNextTimer?: NodeJS.Timeout; pgPool: Pool; _initialized?: boolean; + listenClient?: PoolClient; + listenRelease?: () => void; + stopped?: boolean; constructor({ tasks, @@ -118,6 +121,7 @@ export default class Worker { }); } async doNext(client: PgClientLike): Promise { + if (this.stopped) return; if (!this._initialized) { return await this.initialize(client); } @@ -136,10 +140,12 @@ export default class Worker { })) as JobRow | undefined; if (!job || !job.id) { - this.doNextTimer = setTimeout( - () => this.doNext(client), - this.idleDelay - ); + if (!this.stopped) { + this.doNextTimer = setTimeout( + () => this.doNext(client), + this.idleDelay + ); + } return; } const start = process.hrtime(); @@ -164,12 +170,21 @@ export default class Worker { } catch (fatalError: unknown) { await this.handleFatalError(client, { err, fatalError, jobId }); } - return this.doNext(client); + if (!this.stopped) { + return this.doNext(client); + } + return; } catch (err: unknown) { - this.doNextTimer = setTimeout(() => this.doNext(client), this.idleDelay); + if (!this.stopped) { + this.doNextTimer = setTimeout( + () => this.doNext(client), + this.idleDelay + ); + } } } listen() { + if (this.stopped) return; const listenForChanges = ( err: Error | null, client: PoolClient, @@ -182,9 +197,17 @@ export default class Worker { } // Try again in 5 seconds // should this really be done in the node process? - setTimeout(this.listen, 5000); + if (!this.stopped) { + setTimeout(this.listen, 5000); + } return; } + if (this.stopped) { + release(); + return; + } + this.listenClient = client; + this.listenRelease = release; client.on('notification', () => { if (this.doNextTimer) { // Must be idle, do something! @@ -193,18 +216,47 @@ export default class Worker { }); client.query('LISTEN "jobs:insert"'); client.on('error', (e: unknown) => { + if (this.stopped) { + release(); + return; + } log.error('Error with database notify listener', e); if (e instanceof Error && e.stack) { log.debug(e.stack); } release(); - this.listen(); + if (!this.stopped) { + this.listen(); + } }); log.info(`${this.workerId} connected and looking for jobs...`); this.doNext(client); }; this.pgPool.connect(listenForChanges); } + + async stop(): Promise { + this.stopped = true; + if (this.doNextTimer) { + clearTimeout(this.doNextTimer); + this.doNextTimer = undefined; + } + const client = this.listenClient; + const release = this.listenRelease; + this.listenClient = undefined; + this.listenRelease = undefined; + + if (client && release) { + client.removeAllListeners('notification'); + client.removeAllListeners('error'); + try { + await client.query('UNLISTEN "jobs:insert"'); + } catch { + // Ignore listener cleanup errors during shutdown. + } + release(); + } + } } export { Worker }; diff --git a/packages/cli/src/commands/jobs.ts b/packages/cli/src/commands/jobs.ts index 084d349b8..a1442d233 100644 --- a/packages/cli/src/commands/jobs.ts +++ b/packages/cli/src/commands/jobs.ts @@ -200,7 +200,8 @@ export default async ( try { ensureCwd((args.cwd as string) || process.cwd()); const promptAnswers = await prompter.prompt(args, questions); - await CombinedServer(buildCombinedServerOptions(promptAnswers)); + const server = new CombinedServer(buildCombinedServerOptions(promptAnswers)); + await server.start(); } catch (error) { await cliExitWithError( `Failed to start combined server: ${(error as Error).message}` diff --git a/packages/server/README.md b/packages/server/README.md index 60ee95a4e..b388eebd8 100644 --- a/packages/server/README.md +++ b/packages/server/README.md @@ -20,7 +20,7 @@ ```ts import { CombinedServer } from '@constructive-io/server'; -await CombinedServer({ +const server = new CombinedServer({ graphql: { enabled: true }, functions: { enabled: true, @@ -31,6 +31,9 @@ await CombinedServer({ }, jobs: { enabled: true } }); + +await server.start(); +// await server.stop(); ``` ### Local Development (this repo) diff --git a/packages/server/__fixtures__/jobs.seed.sql b/packages/server/__fixtures__/jobs.seed.sql new file mode 100644 index 000000000..49705e8e8 --- /dev/null +++ b/packages/server/__fixtures__/jobs.seed.sql @@ -0,0 +1,67 @@ +BEGIN; + +CREATE SCHEMA IF NOT EXISTS app_public; + +CREATE TABLE IF NOT EXISTS app_public.users ( + id uuid PRIMARY KEY, + username text NOT NULL, + display_name text, + profile_picture jsonb +); + +INSERT INTO app_public.users (id, username, display_name, profile_picture) +VALUES ( + '00000000-0000-0000-0000-000000000000', + 'sender', + 'Sender', + '{"url":"https://example.com/avatar.png","mime":"image/png"}'::jsonb +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO metaschema_public.database (id, name) +VALUES ('0b22e268-16d6-582b-950a-24e108688849', 'jobs-test') +ON CONFLICT (id) DO NOTHING; + +INSERT INTO services_public.sites (id, database_id, title, logo, dbname) +VALUES ( + '11111111-1111-1111-1111-111111111111', + '0b22e268-16d6-582b-950a-24e108688849', + 'Jobs Test', + '{"url":"https://example.com/logo.png","mime":"image/png"}'::jsonb, + current_database() +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO services_public.domains (id, database_id, site_id, domain, subdomain) +VALUES ( + '22222222-2222-2222-2222-222222222222', + '0b22e268-16d6-582b-950a-24e108688849', + '11111111-1111-1111-1111-111111111111', + 'localhost', + NULL +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO services_public.site_themes (id, database_id, site_id, theme) +VALUES ( + '33333333-3333-3333-3333-333333333333', + '0b22e268-16d6-582b-950a-24e108688849', + '11111111-1111-1111-1111-111111111111', + '{"primary":"#335C67"}'::jsonb +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO services_public.site_modules (id, database_id, site_id, name, data) +VALUES ( + '44444444-4444-4444-4444-444444444444', + '0b22e268-16d6-582b-950a-24e108688849', + '11111111-1111-1111-1111-111111111111', + 'legal_terms_module', + '{"emails":{"support":"support@example.com"},"company":{"name":"Constructive","nick":"Constructive","website":"https://constructive.io"}}'::jsonb +) +ON CONFLICT (id) DO NOTHING; + +GRANT USAGE ON SCHEMA app_public TO administrator; +GRANT SELECT, INSERT, UPDATE, DELETE ON app_public.users TO administrator; + +COMMIT; diff --git a/packages/server/__tests__/jobs.e2e.test.ts b/packages/server/__tests__/jobs.e2e.test.ts index 3a39f207c..2ba0d03da 100644 --- a/packages/server/__tests__/jobs.e2e.test.ts +++ b/packages/server/__tests__/jobs.e2e.test.ts @@ -1,6 +1,13 @@ +import { readFile } from 'fs/promises'; +import { createServer } from 'net'; +import { dirname, join } from 'path'; import supertest from 'supertest'; -import { getConnections } from '@constructive-io/graphql-test'; +import { PgpmInit, PgpmMigrate } from '@pgpmjs/core'; +import { getConnections, seed, type PgTestClient } from 'pgsql-test'; + +import type { CombinedServer as CombinedServerType } from '../src/server'; +import type { CombinedServerOptions, FunctionServiceConfig } from '../src/types'; jest.setTimeout(120000); @@ -13,16 +20,14 @@ type GraphqlClient = { host?: string; }; -const getGraphqlClient = (): GraphqlClient => { - const rawUrl = - process.env.TEST_GRAPHQL_URL || - process.env.GRAPHQL_URL || - 'http://localhost:3000/graphql'; +const buildGraphqlClient = ( + rawUrl: string, + host?: string +): GraphqlClient => { const parsed = new URL(rawUrl); const origin = `${parsed.protocol}//${parsed.host}`; const path = parsed.pathname === '/' ? '/graphql' : `${parsed.pathname}${parsed.search}`; - const host = process.env.TEST_GRAPHQL_HOST || process.env.GRAPHQL_HOST; return { http: supertest(origin), @@ -31,6 +36,16 @@ const getGraphqlClient = (): GraphqlClient => { }; }; +const getGraphqlClient = (): GraphqlClient => { + const rawUrl = + process.env.TEST_GRAPHQL_URL || + process.env.GRAPHQL_URL || + 'http://localhost:3000/graphql'; + const host = process.env.TEST_GRAPHQL_HOST || process.env.GRAPHQL_HOST; + + return buildGraphqlClient(rawUrl, host); +}; + const sendGraphql = async ( client: GraphqlClient, query: string, @@ -122,44 +137,361 @@ const waitForJobCompletion = async ( throw new Error(`Job ${jobId} did not complete within ${timeoutMs}ms`); }; +const seededDatabaseId = '0b22e268-16d6-582b-950a-24e108688849'; +const metaDbExtensions = ['citext', 'uuid-ossp', 'unaccent', 'pgcrypto', 'hstore']; + +const getPgpmModulePath = (pkgName: string): string => + dirname(require.resolve(`${pkgName}/pgpm.plan`)); + +const metaSeedModules = [ + getPgpmModulePath('@pgpm/verify'), + getPgpmModulePath('@pgpm/types'), + getPgpmModulePath('@pgpm/inflection'), + getPgpmModulePath('@pgpm/database-jobs'), + getPgpmModulePath('@pgpm/metaschema-schema'), + getPgpmModulePath('@pgpm/services'), + getPgpmModulePath('@pgpm/metaschema-modules') +]; + +const sql = (f: string) => join(__dirname, '..', '__fixtures__', f); + +type SeededConnections = { + db: PgTestClient; + pg: PgTestClient; + teardown: () => Promise; +}; + +type PgConfigLike = PgTestClient['config']; + +const runMetaMigrations = async (config: PgConfigLike) => { + const migrator = new PgpmMigrate(config); + for (const modulePath of metaSeedModules) { + const result = await migrator.deploy({ modulePath, usePlan: true }); + if (result.failed) { + throw new Error(`Failed to deploy ${modulePath}: ${result.failed}`); + } + } +}; + +const bootstrapAdminUsers = seed.fn(async ({ admin, config, connect }) => { + const roles = connect?.roles; + const connections = connect?.connections; + + if (!roles || !connections) { + throw new Error('Missing pgpm role or connection defaults for admin users.'); + } + + const init = new PgpmInit(config); + try { + await init.bootstrapRoles(roles); + await init.bootstrapTestRoles(roles, connections); + } finally { + await init.close(); + } + + const appUser = connections.app?.user; + if (appUser) { + await admin.grantRole(roles.administrator, appUser, config.database); + } +}); + +const deployMetaModules = seed.fn(async ({ config }) => { + await runMetaMigrations(config); +}); + +const createTestDb = async (): Promise => { + const { db, pg, teardown } = await getConnections( + { db: { extensions: metaDbExtensions } }, + [ + bootstrapAdminUsers, + deployMetaModules, + seed.sqlfile([sql('jobs.seed.sql')]) + ] + ); + + return { db, pg, teardown }; +}; + +const hasSchema = async (client: PgTestClient, schema: string) => { + const row = await client.oneOrNone<{ schema_name: string }>( + 'SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1', + [schema] + ); + return Boolean(row?.schema_name); +}; + +const ensureJobsSchema = async (client: PgTestClient) => { + if (await hasSchema(client, 'app_jobs')) return; + await runMetaMigrations(client.config); + if (!(await hasSchema(client, 'app_jobs'))) { + throw new Error('app_jobs schema was not created by pgpm migrations'); + } +}; + +const getAvailablePort = (): Promise => + new Promise((resolvePort, reject) => { + const server = createServer(); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + if (!address || typeof address === 'string') { + server.close(() => reject(new Error('Failed to allocate a port'))); + return; + } + const port = address.port; + server.close((err) => { + if (err) { + reject(err); + return; + } + resolvePort(port); + }); + }); + server.on('error', reject); + }); + +const waitForReady = async ( + label: string, + check: () => Promise, + timeoutMs = 30000, + getLastError?: () => string | undefined +) => { + const started = Date.now(); + + while (Date.now() - started < timeoutMs) { + try { + if (await check()) return; + } catch { + // ignore and retry + } + await delay(500); + } + + const lastError = getLastError?.(); + if (lastError) { + throw new Error( + `${label} did not become ready within ${timeoutMs}ms. Last error: ${lastError}` + ); + } + throw new Error(`${label} did not become ready within ${timeoutMs}ms.`); +}; + +const waitForGraphql = async (client: GraphqlClient) => { + let lastError: string | undefined; + await waitForReady( + 'GraphQL server', + async () => { + const response = await sendGraphql(client, '{ __typename }'); + if (response.status !== 200) { + const detail = + response.text || + (response.body ? JSON.stringify(response.body) : undefined); + lastError = detail + ? `HTTP ${response.status}: ${detail}` + : `HTTP ${response.status}`; + return false; + } + if (response.body?.errors?.length) { + lastError = response.body.errors + .map((err: { message: string }) => err.message) + .join('; '); + return false; + } + lastError = undefined; + return true; + }, + 30000, + () => lastError + ); +}; + +const waitForCallbackServer = async (callbackUrl: string) => { + const origin = callbackUrl.replace(/\/callback$/, ''); + const http = supertest(origin); + await waitForReady('Jobs callback server', async () => { + const response = await http.post('/callback').send({}); + return response.status === 200; + }); +}; + describe('jobs e2e', () => { let teardown: () => Promise; let graphqlClient: GraphqlClient; let databaseId = ''; - let pg: { oneOrNone?: (query: string, values?: unknown[]) => Promise } | undefined; + let pg: PgTestClient | undefined; + let combinedServer: CombinedServerType | null = null; + const envSnapshot: Record = { + NODE_ENV: process.env.NODE_ENV, + TEST_DB: process.env.TEST_DB, + PGHOST: process.env.PGHOST, + PGPORT: process.env.PGPORT, + PGUSER: process.env.PGUSER, + PGPASSWORD: process.env.PGPASSWORD, + PGDATABASE: process.env.PGDATABASE, + TEST_DATABASE_ID: process.env.TEST_DATABASE_ID, + DEFAULT_DATABASE_ID: process.env.DEFAULT_DATABASE_ID, + TEST_GRAPHQL_URL: process.env.TEST_GRAPHQL_URL, + TEST_GRAPHQL_HOST: process.env.TEST_GRAPHQL_HOST, + GRAPHQL_URL: process.env.GRAPHQL_URL, + META_GRAPHQL_URL: process.env.META_GRAPHQL_URL, + SIMPLE_EMAIL_DRY_RUN: process.env.SIMPLE_EMAIL_DRY_RUN, + SEND_EMAIL_LINK_DRY_RUN: process.env.SEND_EMAIL_LINK_DRY_RUN, + LOCAL_APP_PORT: process.env.LOCAL_APP_PORT, + MAILGUN_DOMAIN: process.env.MAILGUN_DOMAIN, + MAILGUN_FROM: process.env.MAILGUN_FROM, + MAILGUN_REPLY: process.env.MAILGUN_REPLY, + MAILGUN_API_KEY: process.env.MAILGUN_API_KEY, + MAILGUN_KEY: process.env.MAILGUN_KEY, + JOBS_SUPPORT_ANY: process.env.JOBS_SUPPORT_ANY, + JOBS_SUPPORTED: process.env.JOBS_SUPPORTED, + INTERNAL_GATEWAY_DEVELOPMENT_MAP: + process.env.INTERNAL_GATEWAY_DEVELOPMENT_MAP, + INTERNAL_JOBS_CALLBACK_PORT: process.env.INTERNAL_JOBS_CALLBACK_PORT, + JOBS_CALLBACK_BASE_URL: process.env.JOBS_CALLBACK_BASE_URL, + FEATURES_POSTGIS: process.env.FEATURES_POSTGIS + }; beforeAll(async () => { - const targetDb = process.env.TEST_DB || process.env.PGDATABASE; - if (!targetDb) { - throw new Error('TEST_DB or PGDATABASE must point at the jobs database'); - } - process.env.TEST_DB = targetDb; + delete process.env.TEST_DB; + delete process.env.PGDATABASE; - ({ teardown, pg } = await getConnections( - { - schemas: ['app_jobs'], - authRole: 'administrator' - } - )); + ({ teardown, pg } = await createTestDb()); + if (!pg) { + throw new Error('Test database connection is missing'); + } + await ensureJobsSchema(pg); - graphqlClient = getGraphqlClient(); - databaseId = process.env.TEST_DATABASE_ID ?? ''; - if (!databaseId && pg?.oneOrNone) { + databaseId = seededDatabaseId; + if (pg?.oneOrNone) { const row = await pg.oneOrNone<{ id: string }>( - 'SELECT id FROM metaschema_public.database ORDER BY created_at LIMIT 1' + 'SELECT id FROM metaschema_public.database WHERE id = $1', + [databaseId] ); - databaseId = row?.id ?? ''; + if (!row?.id) { + const seedSql = await readFile(sql('jobs.seed.sql'), 'utf8'); + await pg.query(seedSql); + const seeded = await pg.oneOrNone<{ id: string }>( + 'SELECT id FROM metaschema_public.database WHERE id = $1', + [databaseId] + ); + if (!seeded?.id) { + throw new Error(`Seeded database id ${databaseId} was not found`); + } + } } - if (!databaseId) { - throw new Error('TEST_DATABASE_ID is required or metaschema_public.database must contain a row'); + + if (!pg?.config.database) { + throw new Error('Test database config is missing a database name'); } + + const ports = { + graphqlPort: await getAvailablePort(), + callbackPort: await getAvailablePort(), + simpleEmailPort: await getAvailablePort(), + sendEmailLinkPort: await getAvailablePort() + }; + + const graphqlUrl = `http://127.0.0.1:${ports.graphqlPort}/graphql`; + const callbackUrl = `http://127.0.0.1:${ports.callbackPort}/callback`; + + process.env.NODE_ENV = 'test'; + process.env.TEST_DB = pg.config.database; + process.env.PGDATABASE = pg.config.database; process.env.TEST_DATABASE_ID = databaseId; + process.env.DEFAULT_DATABASE_ID = databaseId; + process.env.TEST_GRAPHQL_URL = graphqlUrl; + process.env.GRAPHQL_URL = graphqlUrl; + process.env.META_GRAPHQL_URL = graphqlUrl; + process.env.SIMPLE_EMAIL_DRY_RUN = 'true'; + process.env.SEND_EMAIL_LINK_DRY_RUN = 'true'; + process.env.LOCAL_APP_PORT = String(ports.graphqlPort); + process.env.MAILGUN_DOMAIN = 'mg.constructive.io'; + process.env.MAILGUN_FROM = 'no-reply@mg.constructive.io'; + process.env.MAILGUN_REPLY = 'info@mg.constructive.io'; + process.env.MAILGUN_API_KEY = 'change-me-mailgun-api-key'; + process.env.MAILGUN_KEY = 'change-me-mailgun-api-key'; + process.env.JOBS_SUPPORT_ANY = 'false'; + process.env.JOBS_SUPPORTED = 'simple-email,send-email-link'; + process.env.INTERNAL_GATEWAY_DEVELOPMENT_MAP = JSON.stringify({ + 'simple-email': `http://127.0.0.1:${ports.simpleEmailPort}`, + 'send-email-link': `http://127.0.0.1:${ports.sendEmailLinkPort}` + }); + process.env.INTERNAL_JOBS_CALLBACK_PORT = String(ports.callbackPort); + process.env.JOBS_CALLBACK_BASE_URL = callbackUrl; + process.env.FEATURES_POSTGIS = 'false'; + + if (pg.config.host) process.env.PGHOST = pg.config.host; + if (pg.config.port) process.env.PGPORT = String(pg.config.port); + if (pg.config.user) process.env.PGUSER = pg.config.user; + if (pg.config.password) process.env.PGPASSWORD = pg.config.password; + + const services: FunctionServiceConfig[] = [ + { name: 'simple-email', port: ports.simpleEmailPort }, + { name: 'send-email-link', port: ports.sendEmailLinkPort } + ]; + + const combinedServerOptions: CombinedServerOptions = { + graphql: { + enabled: true, + options: { + pg: { + host: pg.config.host, + port: pg.config.port, + user: pg.config.user, + password: pg.config.password, + database: pg.config.database + }, + server: { + host: '127.0.0.1', + port: ports.graphqlPort + }, + api: { + enableMetaApi: false, + exposedSchemas: [ + 'app_jobs', + 'app_public', + 'metaschema_modules_public', + 'metaschema_public', + 'services_public' + ], + anonRole: 'administrator', + roleName: 'administrator', + defaultDatabaseId: databaseId + }, + features: { + postgis: false + } + } + }, + functions: { + enabled: true, + services + }, + jobs: { enabled: true } + }; + + const { CombinedServer } = await import('../src/server'); + combinedServer = new CombinedServer(combinedServerOptions); + await combinedServer.start(); + + graphqlClient = getGraphqlClient(); + await waitForGraphql(graphqlClient); + await waitForCallbackServer(callbackUrl); }); afterAll(async () => { + if (combinedServer) { + await combinedServer.stop(); + } if (teardown) { await teardown(); } + for (const [key, value] of Object.entries(envSnapshot)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } }); it('creates and processes a simple-email job', async () => { diff --git a/packages/server/package.json b/packages/server/package.json index 9311cb050..fe7d7da7f 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -41,8 +41,13 @@ "dependencies": { "@constructive-io/graphql-server": "workspace:^", "@constructive-io/graphql-types": "workspace:^", + "@constructive-io/job-pg": "workspace:^", + "@constructive-io/job-scheduler": "workspace:^", + "@constructive-io/job-utils": "workspace:^", "@constructive-io/knative-job-fn": "workspace:^", + "@constructive-io/knative-job-server": "workspace:^", "@constructive-io/knative-job-service": "workspace:^", + "@constructive-io/knative-job-worker": "workspace:^", "@constructive-io/send-email-link-fn": "workspace:^", "@constructive-io/simple-email-fn": "workspace:^", "@pgpmjs/env": "workspace:^", @@ -50,6 +55,14 @@ }, "devDependencies": { "@constructive-io/graphql-test": "workspace:^", + "@pgpm/database-jobs": "^0.16.0", + "@pgpm/inflection": "^0.16.0", + "@pgpm/metaschema-modules": "^0.16.1", + "@pgpm/metaschema-schema": "^0.16.1", + "@pgpm/services": "^0.16.1", + "@pgpm/types": "^0.16.0", + "@pgpm/verify": "^0.16.0", + "@pgpmjs/core": "workspace:^", "@types/supertest": "^6.0.3", "makage": "^0.1.10", "nodemon": "^3.1.10", diff --git a/packages/server/src/run.ts b/packages/server/src/run.ts index 9665dec42..fa979dc69 100644 --- a/packages/server/src/run.ts +++ b/packages/server/src/run.ts @@ -84,8 +84,10 @@ export const buildCombinedServerOptionsFromEnv = (): CombinedServerOptions => ({ functions: buildFunctionsOptions() }); -export const startCombinedServerFromEnv = async (): Promise => - CombinedServer(buildCombinedServerOptionsFromEnv()); +export const startCombinedServerFromEnv = async (): Promise => { + const server = new CombinedServer(buildCombinedServerOptionsFromEnv()); + return server.start(); +}; if (require.main === module) { void startCombinedServerFromEnv().catch((error) => { diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index f1be8cc92..5c207a82a 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -1,7 +1,17 @@ -import { GraphQLServer } from '@constructive-io/graphql-server'; -import { bootJobs } from '@constructive-io/knative-job-service/dist/run'; +import { Server as GraphQLServer } from '@constructive-io/graphql-server'; +import jobServerFactory from '@constructive-io/knative-job-server'; +import Worker from '@constructive-io/knative-job-worker'; +import Scheduler from '@constructive-io/job-scheduler'; +import poolManager from '@constructive-io/job-pg'; +import { + getJobSupported, + getJobsCallbackPort, + getSchedulerHostname, + getWorkerHostname +} from '@constructive-io/job-utils'; import { Logger } from '@pgpmjs/logger'; import { createRequire } from 'module'; +import type { Server as HttpServer } from 'http'; import { CombinedServerOptions, @@ -30,7 +40,6 @@ const functionRegistry: Record = { const log = new Logger('combined-server'); const requireFn = createRequire(__filename); -const functionServers = new Map(); const resolveFunctionEntry = (name: FunctionName): FunctionRegistryEntry => { const entry = functionRegistry[name]; @@ -94,7 +103,8 @@ const ensureUniquePorts = (services: FunctionServiceConfig[]) => { }; const startFunction = async ( - service: FunctionServiceConfig + service: FunctionServiceConfig, + functionServers: Map ): Promise => { const entry = resolveFunctionEntry(service.name); const port = resolveFunctionPort(service); @@ -104,7 +114,7 @@ const startFunction = async ( const server = app.listen(port, () => { log.info(`function:${service.name} listening on ${port}`); resolve(); - }) as { on?: (event: string, cb: (err: Error) => void) => void }; + }) as HttpServer & { on?: (event: string, cb: (err: Error) => void) => void }; if (server?.on) { server.on('error', (err) => { @@ -119,8 +129,9 @@ const startFunction = async ( return { name: service.name, port }; }; -export const startFunctions = async ( - options?: FunctionsOptions +const startFunctions = async ( + options: FunctionsOptions | undefined, + functionServers: Map ): Promise => { const services = normalizeFunctionServices(options); if (!services.length) return []; @@ -129,37 +140,174 @@ export const startFunctions = async ( const started: StartedFunction[] = []; for (const service of services) { - started.push(await startFunction(service)); + started.push(await startFunction(service, functionServers)); } return started; }; -export const CombinedServer = async ( - options: CombinedServerOptions = {} -): Promise => { - const result: CombinedServerResult = { +type JobRunner = { + listen: () => void; + stop?: () => Promise | void; +}; + +const listenApp = async ( + app: { listen: (port: number, host?: string) => HttpServer }, + port: number, + host?: string +): Promise => + new Promise((resolveListen, rejectListen) => { + const server = host ? app.listen(port, host) : app.listen(port); + + const cleanup = () => { + server.off('listening', handleListen); + server.off('error', handleError); + }; + + const handleListen = () => { + cleanup(); + resolveListen(server); + }; + + const handleError = (err: Error) => { + cleanup(); + rejectListen(err); + }; + + server.once('listening', handleListen); + server.once('error', handleError); + }); + +const closeServer = async (server?: HttpServer | null): Promise => { + if (!server || !server.listening) return; + await new Promise((resolveClose, rejectClose) => { + server.close((err) => { + if (err) { + rejectClose(err); + return; + } + resolveClose(); + }); + }); +}; + +export class CombinedServer { + private options: CombinedServerOptions; + private started = false; + private result: CombinedServerResult = { functions: [], jobs: false, graphql: false }; + private graphqlServer?: GraphQLServer; + private graphqlHttpServer?: HttpServer; + private functionServers = new Map(); + private jobsHttpServer?: HttpServer; + private worker?: JobRunner; + private scheduler?: JobRunner; + private jobsPoolManager?: { close: () => Promise }; - if (options.graphql?.enabled) { - log.info('starting GraphQL server'); - GraphQLServer(options.graphql.options ?? {}); - result.graphql = true; + constructor(options: CombinedServerOptions = {}) { + this.options = options; } - if (shouldEnableFunctions(options.functions)) { - log.info('starting functions'); - result.functions = await startFunctions(options.functions); + async start(): Promise { + if (this.started) return this.result; + this.started = true; + this.result = { + functions: [], + jobs: false, + graphql: false + }; + + if (this.options.graphql?.enabled) { + log.info('starting GraphQL server'); + this.graphqlServer = new GraphQLServer( + this.options.graphql.options ?? {} + ); + this.graphqlServer.addEventListener(); + this.graphqlHttpServer = this.graphqlServer.listen(); + if (!this.graphqlHttpServer.listening) { + await new Promise((resolveListen) => { + this.graphqlHttpServer!.once('listening', () => resolveListen()); + }); + } + this.result.graphql = true; + } + + if (shouldEnableFunctions(this.options.functions)) { + log.info('starting functions'); + this.result.functions = await startFunctions( + this.options.functions, + this.functionServers + ); + } + + if (this.options.jobs?.enabled) { + log.info('starting jobs service'); + await this.startJobs(); + this.result.jobs = true; + } + + return this.result; } - if (options.jobs?.enabled) { - log.info('starting jobs service'); - await bootJobs(); - result.jobs = true; + async stop(): Promise { + if (!this.started) return; + this.started = false; + + if (this.worker?.stop) { + await this.worker.stop(); + } + if (this.scheduler?.stop) { + await this.scheduler.stop(); + } + this.worker = undefined; + this.scheduler = undefined; + + await closeServer(this.jobsHttpServer); + this.jobsHttpServer = undefined; + + if (this.jobsPoolManager) { + await this.jobsPoolManager.close(); + this.jobsPoolManager = undefined; + } + + for (const server of this.functionServers.values()) { + await closeServer(server); + } + this.functionServers.clear(); + + await closeServer(this.graphqlHttpServer); + this.graphqlHttpServer = undefined; + + if (this.graphqlServer?.close) { + await this.graphqlServer.close({ closeCaches: true }); + } + this.graphqlServer = undefined; } - return result; -}; + private async startJobs(): Promise { + const pgPool = poolManager.getPool(); + const jobsApp = jobServerFactory(pgPool); + const callbackPort = getJobsCallbackPort(); + this.jobsHttpServer = await listenApp(jobsApp, callbackPort); + + const tasks = getJobSupported(); + this.worker = new Worker({ + pgPool, + tasks, + workerId: getWorkerHostname() + }); + this.scheduler = new Scheduler({ + pgPool, + tasks, + workerId: getSchedulerHostname() + }); + + this.jobsPoolManager = poolManager; + + this.worker.listen(); + this.scheduler.listen(); + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d22fe9e02..0a41d97b7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1397,12 +1397,27 @@ importers: '@constructive-io/graphql-types': specifier: workspace:^ version: link:../../graphql/types/dist + '@constructive-io/job-pg': + specifier: workspace:^ + version: link:../../jobs/job-pg + '@constructive-io/job-scheduler': + specifier: workspace:^ + version: link:../../jobs/job-scheduler + '@constructive-io/job-utils': + specifier: workspace:^ + version: link:../../jobs/job-utils '@constructive-io/knative-job-fn': specifier: workspace:^ version: link:../../jobs/knative-job-fn + '@constructive-io/knative-job-server': + specifier: workspace:^ + version: link:../../jobs/knative-job-server '@constructive-io/knative-job-service': specifier: workspace:^ version: link:../../jobs/knative-job-service + '@constructive-io/knative-job-worker': + specifier: workspace:^ + version: link:../../jobs/knative-job-worker '@constructive-io/send-email-link-fn': specifier: workspace:^ version: link:../../functions/send-email-link @@ -1419,6 +1434,30 @@ importers: '@constructive-io/graphql-test': specifier: workspace:^ version: link:../../graphql/test/dist + '@pgpm/database-jobs': + specifier: ^0.16.0 + version: 0.16.0 + '@pgpm/inflection': + specifier: ^0.16.0 + version: 0.16.0 + '@pgpm/metaschema-modules': + specifier: ^0.16.1 + version: 0.16.1 + '@pgpm/metaschema-schema': + specifier: ^0.16.1 + version: 0.16.1 + '@pgpm/services': + specifier: ^0.16.1 + version: 0.16.1 + '@pgpm/types': + specifier: ^0.16.0 + version: 0.16.0 + '@pgpm/verify': + specifier: ^0.16.0 + version: 0.16.0 + '@pgpmjs/core': + specifier: workspace:^ + version: link:../../pgpm/core/dist '@types/supertest': specifier: ^6.0.3 version: 6.0.3 @@ -3155,6 +3194,27 @@ packages: '@paralleldrive/cuid2@2.3.1': resolution: {integrity: sha512-XO7cAxhnTZl0Yggq6jOgjiOHhbgcO4NqFqwSmQpjK3b6TEE6Uj/jfSk6wzYyemh3+I0sHirKSetjQwn5cZktFw==} + '@pgpm/database-jobs@0.16.0': + resolution: {integrity: sha512-s8I7958PlhfYXZKhYoU76R03yk6dlevjGk/Uy9uktveJkZ8C3JVsIhP6Lv4lo0SFEZCjFmXRCYpOY5xINIcX4w==} + + '@pgpm/inflection@0.16.0': + resolution: {integrity: sha512-otjWGx+KkB113Wc5I9nsvoqPhBK6zD1ON2OcXw9PQRgqU43Y9f0yZjb559dDzZwDn5XUeiZMf6il5SIvJE5NPg==} + + '@pgpm/metaschema-modules@0.16.1': + resolution: {integrity: sha512-qH0l4Xe0f0CSzXAC2nItu+ZpGliZ4eezl332HCLpI/bLkIMsmIZYlcjgiPmv7lZae+3uWbn7DQuDxeomsn5kBw==} + + '@pgpm/metaschema-schema@0.16.1': + resolution: {integrity: sha512-FwLy+z8pwfrBeQYErpcDpD55ZtB1X+Ghj6bbE28GVURBlUxmPY1llrLfKLqcA6xaKMyZ+aHOeBlKYRuyo9xdag==} + + '@pgpm/services@0.16.1': + resolution: {integrity: sha512-9wp3nstcTtsARw5cuE/x9Dwq/v7FQUPXlzjsBR/2V6z7oHBjOI8HiQ8y+tc1pnrFL1PJtcthkZKvBZbQBQJbTw==} + + '@pgpm/types@0.16.0': + resolution: {integrity: sha512-CioHCxZGQUnpLANw4aMOOq7Z6zi2SXCxJIRZ8CSBPJfJkWU1OgxX+EpSjnm4Td4bznJhOViXniLltibaaGkMPA==} + + '@pgpm/verify@0.16.0': + resolution: {integrity: sha512-uG0zTXAWGLV8wTUiLdBn+2b4AO+gtiw7sZf+TFFU8h/mVGMBTHUb9Gbsl/GL/5/0zZKOxak7cRJ5deec79KB/A==} + '@pgsql/types@17.6.2': resolution: {integrity: sha512-1UtbELdbqNdyOShhrVfSz3a1gDi0s9XXiQemx+6QqtsrXe62a6zOGU+vjb2GRfG5jeEokI1zBBcfD42enRv0Rw==} @@ -10005,6 +10065,38 @@ snapshots: dependencies: '@noble/hashes': 1.8.0 + '@pgpm/database-jobs@0.16.0': + dependencies: + '@pgpm/verify': 0.16.0 + + '@pgpm/inflection@0.16.0': + dependencies: + '@pgpm/verify': 0.16.0 + + '@pgpm/metaschema-modules@0.16.1': + dependencies: + '@pgpm/metaschema-schema': 0.16.1 + '@pgpm/services': 0.16.1 + '@pgpm/verify': 0.16.0 + + '@pgpm/metaschema-schema@0.16.1': + dependencies: + '@pgpm/database-jobs': 0.16.0 + '@pgpm/inflection': 0.16.0 + '@pgpm/types': 0.16.0 + '@pgpm/verify': 0.16.0 + + '@pgpm/services@0.16.1': + dependencies: + '@pgpm/metaschema-schema': 0.16.1 + '@pgpm/verify': 0.16.0 + + '@pgpm/types@0.16.0': + dependencies: + '@pgpm/verify': 0.16.0 + + '@pgpm/verify@0.16.0': {} + '@pgsql/types@17.6.2': {} '@pgsql/utils@17.8.11': From 14cc9ee609d5111d5a04fca1b228999f5ccfde3b Mon Sep 17 00:00:00 2001 From: zetazzz Date: Thu, 8 Jan 2026 15:52:44 +0700 Subject: [PATCH 4/6] jobs combined server tested in CI --- .github/workflows/jobs-e2e.yaml | 140 --------------------- .github/workflows/run-tests.yaml | 2 + packages/server/__tests__/jobs.e2e.test.ts | 1 - 3 files changed, 2 insertions(+), 141 deletions(-) delete mode 100644 .github/workflows/jobs-e2e.yaml diff --git a/.github/workflows/jobs-e2e.yaml b/.github/workflows/jobs-e2e.yaml deleted file mode 100644 index d6173a78f..000000000 --- a/.github/workflows/jobs-e2e.yaml +++ /dev/null @@ -1,140 +0,0 @@ -name: Jobs E2E -on: - push: - branches: - - main - - v1 - pull_request: - branches: - - main - - v1 - workflow_dispatch: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-jobs-e2e - cancel-in-progress: true - -jobs: - jobs-e2e: - runs-on: ubuntu-latest - - env: - PGHOST: localhost - PGPORT: 5432 - PGUSER: postgres - PGPASSWORD: password - PGDATABASE: launchql - TEST_DB: launchql - TEST_GRAPHQL_URL: http://127.0.0.1:3000/graphql - TEST_GRAPHQL_HOST: admin.localhost - - services: - pg_db: - image: pyramation/pgvector:13.3-alpine - env: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: password - options: >- - --health-cmd pg_isready - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 5432:5432 - - steps: - - name: Configure Git (for tests) - run: | - git config --global user.name "CI Test User" - git config --global user.email "ci@example.com" - - - name: checkout - uses: actions/checkout@v4 - - - name: checkout constructive-db - uses: actions/checkout@v4 - with: - repository: constructive-io/constructive-db - path: constructive-db - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Setup Node.js - uses: actions/setup-node@v4 - with: - node-version: '20' - - - name: Setup pnpm - uses: pnpm/action-setup@v2 - with: - version: 10 - - - name: Install dependencies - run: pnpm install - - - name: Build packages - run: pnpm run build - - - name: Setup jobs database - run: | - PGDATABASE=postgres createdb launchql || true - pnpm --filter pgpm exec pgpm admin-users bootstrap --yes --cwd constructive-db - pnpm --filter pgpm exec pgpm admin-users add --test --yes --cwd constructive-db - pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package app-svc-local --cwd constructive-db - pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package metaschema --cwd constructive-db - pnpm --filter pgpm exec pgpm deploy --yes --database "$PGDATABASE" --package pgpm-database-jobs --cwd constructive-db - - - name: Resolve database id - run: | - DBID=$(psql -h "$PGHOST" -U "$PGUSER" -d "$PGDATABASE" -Atc "SELECT id FROM metaschema_public.database ORDER BY created_at LIMIT 1;") - if [ -z "$DBID" ]; then - echo "No database id found in metaschema_public.database" >&2 - exit 1 - fi - echo "TEST_DATABASE_ID=$DBID" >> "$GITHUB_ENV" - echo "DEFAULT_DATABASE_ID=$DBID" >> "$GITHUB_ENV" - - - name: Start combined server - env: - NODE_ENV: test - PORT: "3000" - SERVER_HOST: "127.0.0.1" - API_ENABLE_META: "false" - API_EXPOSED_SCHEMAS: "app_jobs,lql_private,lql_public,lql_roles_public,metaschema_modules_public,metaschema_public,services_public" - API_ANON_ROLE: "administrator" - API_ROLE_NAME: "administrator" - API_DEFAULT_DATABASE_ID: ${{ env.DEFAULT_DATABASE_ID }} - CONSTRUCTIVE_GRAPHQL_ENABLED: "true" - CONSTRUCTIVE_JOBS_ENABLED: "true" - CONSTRUCTIVE_FUNCTIONS: "simple-email,send-email-link" - CONSTRUCTIVE_FUNCTION_PORTS: "simple-email:8081,send-email-link:8082" - SIMPLE_EMAIL_DRY_RUN: "true" - SEND_EMAIL_LINK_DRY_RUN: "true" - LOCAL_APP_PORT: "3000" - GRAPHQL_URL: "http://127.0.0.1:3000/graphql" - META_GRAPHQL_URL: "http://127.0.0.1:3000/graphql" - GRAPHQL_HOST_HEADER: "admin.localhost" - META_GRAPHQL_HOST_HEADER: "admin.localhost" - MAILGUN_DOMAIN: "mg.constructive.io" - MAILGUN_FROM: "no-reply@mg.constructive.io" - MAILGUN_REPLY: "info@mg.constructive.io" - MAILGUN_API_KEY: "change-me-mailgun-api-key" - MAILGUN_KEY: "change-me-mailgun-api-key" - JOBS_SUPPORT_ANY: "false" - JOBS_SUPPORTED: "simple-email,send-email-link" - INTERNAL_GATEWAY_DEVELOPMENT_MAP: '{"simple-email":"http://127.0.0.1:8081","send-email-link":"http://127.0.0.1:8082"}' - INTERNAL_JOBS_CALLBACK_PORT: "8080" - JOBS_CALLBACK_BASE_URL: "http://127.0.0.1:8080/callback" - FEATURES_POSTGIS: "false" - run: | - nohup node packages/server/dist/run.js > /tmp/combined-server.log 2>&1 & - echo $! > /tmp/combined-server.pid - - - name: Test server jobs e2e - run: pnpm --filter @constructive-io/server test - - - name: Stop combined server - if: always() - run: | - if [ -f /tmp/combined-server.pid ]; then - kill "$(cat /tmp/combined-server.pid)" || true - fi diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 52079a4cd..79e676bf3 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -33,6 +33,8 @@ jobs: env: {} - package: packages/cli env: {} + - package: packages/server + env: {} - package: packages/client env: TEST_DATABASE_URL: postgres://postgres:password@localhost:5432/postgres diff --git a/packages/server/__tests__/jobs.e2e.test.ts b/packages/server/__tests__/jobs.e2e.test.ts index 2ba0d03da..92c610241 100644 --- a/packages/server/__tests__/jobs.e2e.test.ts +++ b/packages/server/__tests__/jobs.e2e.test.ts @@ -394,7 +394,6 @@ describe('jobs e2e', () => { const callbackUrl = `http://127.0.0.1:${ports.callbackPort}/callback`; process.env.NODE_ENV = 'test'; - process.env.TEST_DB = pg.config.database; process.env.PGDATABASE = pg.config.database; process.env.TEST_DATABASE_ID = databaseId; process.env.DEFAULT_DATABASE_ID = databaseId; From 1ad6dde71ed76b10449dd5abc7bb5468472fe45f Mon Sep 17 00:00:00 2001 From: zetazzz Date: Thu, 8 Jan 2026 18:48:11 +0700 Subject: [PATCH 5/6] refine jobs e2e test --- packages/server/__tests__/jobs.e2e.test.ts | 144 ++------------------- 1 file changed, 14 insertions(+), 130 deletions(-) diff --git a/packages/server/__tests__/jobs.e2e.test.ts b/packages/server/__tests__/jobs.e2e.test.ts index 92c610241..a240eb333 100644 --- a/packages/server/__tests__/jobs.e2e.test.ts +++ b/packages/server/__tests__/jobs.e2e.test.ts @@ -1,5 +1,3 @@ -import { readFile } from 'fs/promises'; -import { createServer } from 'net'; import { dirname, join } from 'path'; import supertest from 'supertest'; @@ -139,6 +137,10 @@ const waitForJobCompletion = async ( const seededDatabaseId = '0b22e268-16d6-582b-950a-24e108688849'; const metaDbExtensions = ['citext', 'uuid-ossp', 'unaccent', 'pgcrypto', 'hstore']; +const GRAPHQL_PORT = 3000; +const CALLBACK_PORT = 8080; +const SIMPLE_EMAIL_PORT = 8081; +const SEND_EMAIL_LINK_PORT = 8082; const getPgpmModulePath = (pkgName: string): string => dirname(require.resolve(`${pkgName}/pgpm.plan`)); @@ -212,106 +214,7 @@ const createTestDb = async (): Promise => { return { db, pg, teardown }; }; -const hasSchema = async (client: PgTestClient, schema: string) => { - const row = await client.oneOrNone<{ schema_name: string }>( - 'SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1', - [schema] - ); - return Boolean(row?.schema_name); -}; - -const ensureJobsSchema = async (client: PgTestClient) => { - if (await hasSchema(client, 'app_jobs')) return; - await runMetaMigrations(client.config); - if (!(await hasSchema(client, 'app_jobs'))) { - throw new Error('app_jobs schema was not created by pgpm migrations'); - } -}; -const getAvailablePort = (): Promise => - new Promise((resolvePort, reject) => { - const server = createServer(); - server.listen(0, '127.0.0.1', () => { - const address = server.address(); - if (!address || typeof address === 'string') { - server.close(() => reject(new Error('Failed to allocate a port'))); - return; - } - const port = address.port; - server.close((err) => { - if (err) { - reject(err); - return; - } - resolvePort(port); - }); - }); - server.on('error', reject); - }); - -const waitForReady = async ( - label: string, - check: () => Promise, - timeoutMs = 30000, - getLastError?: () => string | undefined -) => { - const started = Date.now(); - - while (Date.now() - started < timeoutMs) { - try { - if (await check()) return; - } catch { - // ignore and retry - } - await delay(500); - } - - const lastError = getLastError?.(); - if (lastError) { - throw new Error( - `${label} did not become ready within ${timeoutMs}ms. Last error: ${lastError}` - ); - } - throw new Error(`${label} did not become ready within ${timeoutMs}ms.`); -}; - -const waitForGraphql = async (client: GraphqlClient) => { - let lastError: string | undefined; - await waitForReady( - 'GraphQL server', - async () => { - const response = await sendGraphql(client, '{ __typename }'); - if (response.status !== 200) { - const detail = - response.text || - (response.body ? JSON.stringify(response.body) : undefined); - lastError = detail - ? `HTTP ${response.status}: ${detail}` - : `HTTP ${response.status}`; - return false; - } - if (response.body?.errors?.length) { - lastError = response.body.errors - .map((err: { message: string }) => err.message) - .join('; '); - return false; - } - lastError = undefined; - return true; - }, - 30000, - () => lastError - ); -}; - -const waitForCallbackServer = async (callbackUrl: string) => { - const origin = callbackUrl.replace(/\/callback$/, ''); - const http = supertest(origin); - await waitForReady('Jobs callback server', async () => { - const response = await http.post('/callback').send({}); - return response.status === 200; - }); -}; describe('jobs e2e', () => { let teardown: () => Promise; @@ -358,8 +261,6 @@ describe('jobs e2e', () => { if (!pg) { throw new Error('Test database connection is missing'); } - await ensureJobsSchema(pg); - databaseId = seededDatabaseId; if (pg?.oneOrNone) { const row = await pg.oneOrNone<{ id: string }>( @@ -367,15 +268,7 @@ describe('jobs e2e', () => { [databaseId] ); if (!row?.id) { - const seedSql = await readFile(sql('jobs.seed.sql'), 'utf8'); - await pg.query(seedSql); - const seeded = await pg.oneOrNone<{ id: string }>( - 'SELECT id FROM metaschema_public.database WHERE id = $1', - [databaseId] - ); - if (!seeded?.id) { - throw new Error(`Seeded database id ${databaseId} was not found`); - } + throw new Error(`Seeded database id ${databaseId} was not found`); } } @@ -383,15 +276,8 @@ describe('jobs e2e', () => { throw new Error('Test database config is missing a database name'); } - const ports = { - graphqlPort: await getAvailablePort(), - callbackPort: await getAvailablePort(), - simpleEmailPort: await getAvailablePort(), - sendEmailLinkPort: await getAvailablePort() - }; - - const graphqlUrl = `http://127.0.0.1:${ports.graphqlPort}/graphql`; - const callbackUrl = `http://127.0.0.1:${ports.callbackPort}/callback`; + const graphqlUrl = `http://127.0.0.1:${GRAPHQL_PORT}/graphql`; + const callbackUrl = `http://127.0.0.1:${CALLBACK_PORT}/callback`; process.env.NODE_ENV = 'test'; process.env.PGDATABASE = pg.config.database; @@ -402,7 +288,7 @@ describe('jobs e2e', () => { process.env.META_GRAPHQL_URL = graphqlUrl; process.env.SIMPLE_EMAIL_DRY_RUN = 'true'; process.env.SEND_EMAIL_LINK_DRY_RUN = 'true'; - process.env.LOCAL_APP_PORT = String(ports.graphqlPort); + process.env.LOCAL_APP_PORT = String(GRAPHQL_PORT); process.env.MAILGUN_DOMAIN = 'mg.constructive.io'; process.env.MAILGUN_FROM = 'no-reply@mg.constructive.io'; process.env.MAILGUN_REPLY = 'info@mg.constructive.io'; @@ -411,10 +297,10 @@ describe('jobs e2e', () => { process.env.JOBS_SUPPORT_ANY = 'false'; process.env.JOBS_SUPPORTED = 'simple-email,send-email-link'; process.env.INTERNAL_GATEWAY_DEVELOPMENT_MAP = JSON.stringify({ - 'simple-email': `http://127.0.0.1:${ports.simpleEmailPort}`, - 'send-email-link': `http://127.0.0.1:${ports.sendEmailLinkPort}` + 'simple-email': `http://127.0.0.1:${SIMPLE_EMAIL_PORT}`, + 'send-email-link': `http://127.0.0.1:${SEND_EMAIL_LINK_PORT}` }); - process.env.INTERNAL_JOBS_CALLBACK_PORT = String(ports.callbackPort); + process.env.INTERNAL_JOBS_CALLBACK_PORT = String(CALLBACK_PORT); process.env.JOBS_CALLBACK_BASE_URL = callbackUrl; process.env.FEATURES_POSTGIS = 'false'; @@ -424,8 +310,8 @@ describe('jobs e2e', () => { if (pg.config.password) process.env.PGPASSWORD = pg.config.password; const services: FunctionServiceConfig[] = [ - { name: 'simple-email', port: ports.simpleEmailPort }, - { name: 'send-email-link', port: ports.sendEmailLinkPort } + { name: 'simple-email', port: SIMPLE_EMAIL_PORT }, + { name: 'send-email-link', port: SEND_EMAIL_LINK_PORT } ]; const combinedServerOptions: CombinedServerOptions = { @@ -441,7 +327,7 @@ describe('jobs e2e', () => { }, server: { host: '127.0.0.1', - port: ports.graphqlPort + port: GRAPHQL_PORT }, api: { enableMetaApi: false, @@ -473,8 +359,6 @@ describe('jobs e2e', () => { await combinedServer.start(); graphqlClient = getGraphqlClient(); - await waitForGraphql(graphqlClient); - await waitForCallbackServer(callbackUrl); }); afterAll(async () => { From 309bc31aae23827ae09b827f2e0bf321965d0866 Mon Sep 17 00:00:00 2001 From: zetazzz Date: Fri, 9 Jan 2026 09:39:45 +0700 Subject: [PATCH 6/6] use default job callback port 12345 --- packages/server/__tests__/jobs.e2e.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/__tests__/jobs.e2e.test.ts b/packages/server/__tests__/jobs.e2e.test.ts index a240eb333..8451f186d 100644 --- a/packages/server/__tests__/jobs.e2e.test.ts +++ b/packages/server/__tests__/jobs.e2e.test.ts @@ -138,7 +138,7 @@ const waitForJobCompletion = async ( const seededDatabaseId = '0b22e268-16d6-582b-950a-24e108688849'; const metaDbExtensions = ['citext', 'uuid-ossp', 'unaccent', 'pgcrypto', 'hstore']; const GRAPHQL_PORT = 3000; -const CALLBACK_PORT = 8080; +const CALLBACK_PORT = 12345; const SIMPLE_EMAIL_PORT = 8081; const SEND_EMAIL_LINK_PORT = 8082;