Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
env: {}
- package: packages/cli
env: {}
- package: packages/server
env: {}
- package: packages/client
env:
TEST_DATABASE_URL: postgres://postgres:password@localhost:5432/postgres
Expand Down
3 changes: 2 additions & 1 deletion functions/send-email-link/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import app from '@constructive-io/knative-job-fn';
import { createJobApp } from '@constructive-io/knative-job-fn';
import { GraphQLClient } from 'graphql-request';
import gql from 'graphql-tag';
import { generate } from '@launchql/mjml';
import { send } from '@launchql/postmaster';
import { parseEnvBoolean } from '@pgpmjs/env';

const isDryRun = parseEnvBoolean(process.env.SEND_EMAIL_LINK_DRY_RUN) ?? false;
const app = createJobApp();

const GetUser = gql`
query GetUser($userId: UUID!) {
Expand Down
3 changes: 2 additions & 1 deletion functions/simple-email/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import app from '@constructive-io/knative-job-fn';
import { createJobApp } from '@constructive-io/knative-job-fn';
import { parseEnvBoolean } from '@pgpmjs/env';
import { send as sendEmail } from '@launchql/postmaster';

Expand Down Expand Up @@ -26,6 +26,7 @@ const getRequiredField = (
};

const isDryRun = parseEnvBoolean(process.env.SIMPLE_EMAIL_DRY_RUN) ?? false;
const app = createJobApp();

app.post('/', async (req: any, res: any, next: any) => {
try {
Expand Down
102 changes: 91 additions & 11 deletions graphql/server/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { getEnvOptions, getNodeEnv } from '@constructive-io/graphql-env';
import { Logger } from '@pgpmjs/logger';
import { healthz, poweredBy, trustProxy } from '@pgpmjs/server-utils';
import { healthz, poweredBy, svcCache, trustProxy } from '@pgpmjs/server-utils';
import { PgpmOptions } from '@pgpmjs/types';
import { middleware as parseDomains } from '@constructive-io/url-domains';
import { randomUUID } from 'crypto';
import express, { Express, RequestHandler } from 'express';
import type { Server as HttpServer } from 'http';
// @ts-ignore
import graphqlUpload from 'graphql-upload';
import { Pool, PoolClient } from 'pg';
import { getPgPool } from 'pg-cache';
import { graphileCache } from 'graphile-cache';
import { getPgPool, pgCache } from 'pg-cache';
import requestIp from 'request-ip';

import { createApiMiddleware } from './middleware/api';
Expand All @@ -30,13 +32,19 @@ export const GraphQLServer = (rawOpts: PgpmOptions = {}) => {
class Server {
private app: Express;
private opts: PgpmOptions;
private listenClient: PoolClient | null = null;
private listenRelease: (() => void) | null = null;
private shuttingDown = false;
private closed = false;
private httpServer: HttpServer | null = null;

constructor(opts: PgpmOptions) {
this.opts = getEnvOptions(opts);
const effectiveOpts = this.opts;

const app = express();
const api = createApiMiddleware(opts);
const authenticate = createAuthenticateMiddleware(opts);
const api = createApiMiddleware(effectiveOpts);
const authenticate = createAuthenticateMiddleware(effectiveOpts);
const requestLogger: RequestHandler = (req, res, next) => {
const headerRequestId = req.header('x-request-id');
const reqId = headerRequestId || randomUUID();
Expand Down Expand Up @@ -72,17 +80,17 @@ class Server {
// Log startup config in dev mode
if (isDev()) {
log.debug(
`Database: ${opts.pg?.database}@${opts.pg?.host}:${opts.pg?.port}`
`Database: ${effectiveOpts.pg?.database}@${effectiveOpts.pg?.host}:${effectiveOpts.pg?.port}`
);
log.debug(
`Meta schemas: ${(opts as any).api?.metaSchemas?.join(', ') || 'default'}`
`Meta schemas: ${(effectiveOpts as any).api?.metaSchemas?.join(', ') || 'default'}`
);
}

healthz(app);
trustProxy(app, opts.server.trustProxy);
trustProxy(app, effectiveOpts.server.trustProxy);
// Warn if a global CORS override is set in production
const fallbackOrigin = opts.server?.origin?.trim();
const fallbackOrigin = effectiveOpts.server?.origin?.trim();
if (fallbackOrigin && process.env.NODE_ENV === 'production') {
if (fallbackOrigin === '*') {
log.warn(
Expand All @@ -103,13 +111,13 @@ class Server {
app.use(requestLogger);
app.use(api);
app.use(authenticate);
app.use(graphile(opts));
app.use(graphile(effectiveOpts));
app.use(flush);

this.app = app;
}

listen(): void {
listen(): HttpServer {
const { server } = this.opts;
const httpServer = this.app.listen(server?.port, server?.host, () =>
log.info(`listening at http://${server?.host}:${server?.port}`)
Expand All @@ -123,6 +131,9 @@ class Server {
}
throw err;
});

this.httpServer = httpServer;
return httpServer;
}

async flush(databaseId: string): Promise<void> {
Expand All @@ -134,6 +145,7 @@ class Server {
}

addEventListener(): void {
if (this.shuttingDown) return;
const pgPool = this.getPool();
pgPool.connect(this.listenForChanges.bind(this));
}
Expand All @@ -145,10 +157,20 @@ class Server {
): void {
if (err) {
this.error('Error connecting with notify listener', err);
setTimeout(() => this.addEventListener(), 5000);
if (!this.shuttingDown) {
setTimeout(() => this.addEventListener(), 5000);
}
return;
}

if (this.shuttingDown) {
release();
return;
}

this.listenClient = client;
this.listenRelease = release;

client.on('notification', ({ channel, payload }) => {
if (channel === 'schema:update' && payload) {
log.info('schema:update', payload);
Expand All @@ -159,6 +181,10 @@ class Server {
client.query('LISTEN "schema:update"');

client.on('error', (e) => {
if (this.shuttingDown) {
release();
return;
}
this.error('Error with database notify listener', e);
release();
this.addEventListener();
Expand All @@ -167,6 +193,60 @@ class Server {
this.log('connected and listening for changes...');
}

async removeEventListener(): Promise<void> {
if (!this.listenClient || !this.listenRelease) {
return;
}

const client = this.listenClient;
const release = this.listenRelease;
this.listenClient = null;
this.listenRelease = null;

client.removeAllListeners('notification');
client.removeAllListeners('error');

try {
await client.query('UNLISTEN "schema:update"');
} catch {
// Ignore listener cleanup errors during shutdown.
}

release();
}

async close(opts: { closeCaches?: boolean } = {}): Promise<void> {
const { closeCaches = false } = opts;
if (this.closed) {
if (closeCaches) {
await Server.closeCaches({ closePools: true });
}
return;
}
this.closed = true;
this.shuttingDown = true;
await this.removeEventListener();
if (this.httpServer?.listening) {
await new Promise<void>((resolve) =>
this.httpServer!.close(() => resolve())
);
}
if (closeCaches) {
await Server.closeCaches({ closePools: true });
}
}

static async closeCaches(
opts: { closePools?: boolean } = {}
): Promise<void> {
const { closePools = false } = opts;
svcCache.clear();
graphileCache.clear();
if (closePools) {
await pgCache.close();
}
}

log(text: string): void {
log.info(text);
}
Expand Down
71 changes: 63 additions & 8 deletions jobs/job-scheduler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ export default class Scheduler {
pgPool: Pool;
jobs: Record<ScheduledJobRow['id'], SchedulerJobHandle>;
_initialized?: boolean;
listenClient?: PoolClient;
listenRelease?: () => void;
stopped?: boolean;

constructor({
tasks,
Expand Down Expand Up @@ -135,6 +138,7 @@ export default class Scheduler {
this.jobs[id] = j as SchedulerJobHandle;
}
async doNext(client: PgClientLike): Promise<void> {
if (this.stopped) return;
if (!this._initialized) {
return await this.initialize(client);
}
Expand All @@ -151,10 +155,12 @@ export default class Scheduler {
: this.supportedTaskNames
});
if (!job || !job.id) {
this.doNextTimer = setTimeout(
() => this.doNext(client),
this.idleDelay
);
if (!this.stopped) {
this.doNextTimer = setTimeout(
() => this.doNext(client),
this.idleDelay
);
}
return;
}
const start = process.hrtime();
Expand All @@ -180,12 +186,21 @@ export default class Scheduler {
} catch (fatalError: unknown) {
await this.handleFatalError(client, { err, fatalError, jobId });
}
return this.doNext(client);
if (!this.stopped) {
return this.doNext(client);
}
return;
} catch (err: unknown) {
this.doNextTimer = setTimeout(() => this.doNext(client), this.idleDelay);
if (!this.stopped) {
this.doNextTimer = setTimeout(
() => this.doNext(client),
this.idleDelay
);
}
}
}
listen() {
if (this.stopped) return;
const listenForChanges = (
err: Error | null,
client: PoolClient,
Expand All @@ -198,9 +213,17 @@ export default class Scheduler {
}
// Try again in 5 seconds
// should this really be done in the node process?
setTimeout(this.listen, 5000);
if (!this.stopped) {
setTimeout(this.listen, 5000);
}
return;
}
if (this.stopped) {
release();
return;
}
this.listenClient = client;
this.listenRelease = release;
client.on('notification', () => {
log.info('a NEW scheduled JOB!');
if (this.doNextTimer) {
Expand All @@ -210,12 +233,18 @@ export default class Scheduler {
});
client.query('LISTEN "scheduled_jobs:insert"');
client.on('error', (e: unknown) => {
if (this.stopped) {
release();
return;
}
log.error('Error with database notify listener', e);
if (e instanceof Error && e.stack) {
log.debug(e.stack);
}
release();
this.listen();
if (!this.stopped) {
this.listen();
}
});
log.info(
`${this.workerId} connected and looking for scheduled jobs...`
Expand All @@ -224,6 +253,32 @@ export default class Scheduler {
};
this.pgPool.connect(listenForChanges);
}

async stop(): Promise<void> {
this.stopped = true;
if (this.doNextTimer) {
clearTimeout(this.doNextTimer);
this.doNextTimer = undefined;
}
Object.values(this.jobs).forEach((job) => job.cancel());
this.jobs = {};

const client = this.listenClient;
const release = this.listenRelease;
this.listenClient = undefined;
this.listenRelease = undefined;

if (client && release) {
client.removeAllListeners('notification');
client.removeAllListeners('error');
try {
await client.query('UNLISTEN "scheduled_jobs:insert"');
} catch {
// Ignore listener cleanup errors during shutdown.
}
release();
}
}
}

export { Scheduler };
4 changes: 3 additions & 1 deletion jobs/knative-job-example/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import app from '@constructive-io/knative-job-fn';
import { createJobApp } from '@constructive-io/knative-job-fn';

const app = createJobApp();

app.post('/', async (req: any, res: any, next: any) => {
if (req.body.throw) {
Expand Down
Loading