Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,78 @@ await gateway.listen(3000)
console.log('Cluster started with 4 workers')
```

#### Advanced usage: Cluster lifecycle and operations

Bungate’s cluster manager powers zero-downtime restarts, dynamic scaling, and safe shutdowns in production. You can control it via signals or programmatically.

- Zero-downtime rolling restart: send `SIGUSR2` to the master process
- The manager spawns a replacement worker first, then gracefully stops the old one
- Graceful shutdown: send `SIGTERM` or `SIGINT`
- Workers receive `SIGTERM` and are given up to `shutdownTimeout` to exit before being force-killed

Programmatic controls (available when using the `ClusterManager` directly):

```ts
import { ClusterManager, BunGateLogger } from 'bungate'

const logger = new BunGateLogger({ level: 'info' })

const cluster = new ClusterManager(
{
enabled: true,
workers: 4,
restartWorkers: true,
restartDelay: 1000, // base delay used for exponential backoff with jitter
maxRestarts: 10, // lifetime cap per worker
respawnThreshold: 5, // sliding window cap
respawnThresholdTime: 60_000, // within this time window
shutdownTimeout: 30_000,
// Set to false when embedding in tests to avoid process.exit(0)
exitOnShutdown: true,
},
logger,
'./gateway.ts', // worker entry (executed with Bun)
)

await cluster.start()

// Dynamic scaling
await cluster.scaleUp(2) // add 2 workers
await cluster.scaleDown(1) // remove 1 worker
await cluster.scaleTo(6) // set exact worker count

// Operational visibility
console.log(cluster.getWorkerCount())
console.log(cluster.getWorkerInfo()) // includes id, restarts, pid, etc.

// Broadcast a POSIX signal to all workers (e.g., for log-level reloads)
cluster.broadcastSignal('SIGHUP')

// Target a single worker
cluster.sendSignalToWorker(1, 'SIGHUP')

// Graceful shutdown (will exit process if exitOnShutdown !== false)
// await (cluster as any).gracefulShutdown() // internal in gateway use; prefer SIGTERM
```

Notes:

- Each worker receives `CLUSTER_WORKER=true` and `CLUSTER_WORKER_ID=<n>` environment variables.
- Restart policy uses exponential backoff with jitter and a sliding window threshold to prevent flapping.
- Defaults: `shutdownTimeout` 30s, `respawnThreshold` 5 within 60s, `restartDelay` 1s, `maxRestarts` 10.

Configuration reference (cluster):

- `enabled` (boolean): enable multi-process mode
- `workers` (number): worker process count (defaults to CPU cores)
- `restartWorkers` (boolean): auto-respawn crashed workers
- `restartDelay` (ms): base delay for backoff
- `maxRestarts` (number): lifetime restarts per worker
- `respawnThreshold` (number): max restarts within time window
- `respawnThresholdTime` (ms): sliding window size
- `shutdownTimeout` (ms): grace period before force-kill
- `exitOnShutdown` (boolean): if true (default), master exits after shutdown; set false in tests/embedded

### 🔄 **Advanced Load Balancing**

Distribute traffic intelligently across multiple backends:
Expand Down
207 changes: 165 additions & 42 deletions src/cluster/cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export interface WorkerInfo {
lastRestartTime: number
/** Flag indicating worker is in shutdown process */
isExiting: boolean
/**
* Timestamps of recent restarts used to enforce respawn threshold within a time window
* Old entries are pruned automatically on access
*/
restartTimestamps: number[]
}

/**
Expand All @@ -68,6 +73,12 @@ export class ClusterManager {
private isShuttingDown = false
/** Path to worker script for spawning processes */
private workerScript: string
/** Guard to prevent double start */
private started = false
/** Bound signal handlers to allow proper removal */
private boundSigint?: () => void
private boundSigterm?: () => void
private boundSigusr2?: () => void

/**
* Initialize cluster manager with configuration and dependencies
Expand Down Expand Up @@ -106,54 +117,70 @@ export class ClusterManager {
return
}

if (this.started) {
this.logger?.warn('Cluster already started; ignoring subsequent start()')
return
}
this.started = true

this.logger?.info(`Starting cluster with ${this.config.workers} workers`)

// Configure signal handlers for graceful lifecycle management
process.on('SIGINT', this.gracefulShutdown.bind(this))
process.on('SIGTERM', this.gracefulShutdown.bind(this))
process.on('SIGUSR2', this.restartAllWorkers.bind(this))
this.boundSigint = this.gracefulShutdown.bind(this)
this.boundSigterm = this.gracefulShutdown.bind(this)
this.boundSigusr2 = this.restartAllWorkers.bind(this)
process.on('SIGINT', this.boundSigint)
process.on('SIGTERM', this.boundSigterm)
process.on('SIGUSR2', this.boundSigusr2)

// Calculate number of workers, ensuring at least one
const maxWorkers = Math.max(1, this.config.workers || cpus().length)

// Spawn workers
for (let i = 0; i < this.config.workers!; i++) {
await this.spawnWorker()
for (let i = 0; i < maxWorkers; i++) {
try {
await this.spawnWorker()
} catch (error) {
this.logger?.error(`Failed to spawn worker ${i}:`, error as Error)
}
}

this.logger?.info('Cluster started successfully')
}

private async spawnWorker(): Promise<void> {
const workerId = this.nextWorkerId++
private async spawnWorker(workerId?: number): Promise<void> {
const id = workerId ?? this.nextWorkerId++

try {
const worker = spawn({
cmd: [process.execPath, this.workerScript],
env: {
...process.env,
CLUSTER_WORKER: 'true',
CLUSTER_WORKER_ID: workerId.toString(),
CLUSTER_WORKER_ID: id.toString(),
},
stdio: ['inherit', 'inherit', 'inherit'],
})

const workerInfo: WorkerInfo = {
id: workerId,
id,
process: worker,
restarts: 0,
lastRestartTime: 0,
isExiting: false,
restartTimestamps: [],
}

this.workers.set(workerId, workerInfo)
this.workers.set(id, workerInfo)

// Handle worker exit
worker.exited.then((exitCode) => {
this.handleWorkerExit(workerInfo, exitCode)
})

this.logger?.info(`Worker ${workerId} started (PID: ${worker.pid})`)
this.logger?.info(`Worker ${id} started (PID: ${worker.pid})`)
} catch (error) {
this.logger?.error(`Failed to spawn worker ${workerId}:`, error as Error)
throw error
this.logger?.error(`Failed to spawn worker ${id}:`, error as Error)
}
}

Expand All @@ -174,16 +201,25 @@ export class ClusterManager {
// Check if we should restart the worker
if (this.config.restartWorkers && this.shouldRestartWorker(workerInfo)) {
this.logger?.info(`Restarting worker ${id}`)
// Track restart metrics
const now = Date.now()
workerInfo.restarts++
workerInfo.lastRestartTime = Date.now()

// Add restart delay
await new Promise((resolve) =>
setTimeout(resolve, this.config.restartDelay),
workerInfo.lastRestartTime = now
workerInfo.restartTimestamps.push(now)
// Apply exponential backoff with jitter based on restarts count
const base = Math.max(0, this.config.restartDelay ?? 1000)
const attempt = Math.max(1, workerInfo.restarts)
const maxDelay = 30000 // cap at 30s to avoid unbounded waits
const backoff = Math.min(
maxDelay,
base * Math.pow(2, Math.min(5, attempt - 1)),
)
const jitter = Math.floor(Math.random() * Math.floor(base / 2))
const delay = Math.min(maxDelay, backoff + jitter)
await new Promise((resolve) => setTimeout(resolve, delay))

try {
await this.spawnWorker()
await this.spawnWorker(id)
} catch (error) {
this.logger?.error(`Failed to restart worker ${id}:`, error as Error)
}
Expand All @@ -195,43 +231,48 @@ export class ClusterManager {
}

private shouldRestartWorker(workerInfo: WorkerInfo): boolean {
const { restarts, lastRestartTime } = workerInfo
const { restarts } = workerInfo
const now = Date.now()

// Check max restarts
if (restarts >= this.config.maxRestarts!) {
// Check max restarts (lifetime)
if (
typeof this.config.maxRestarts === 'number' &&
restarts >= (this.config.maxRestarts ?? 10)
) {
return false
}

// Check respawn threshold
if (this.config.respawnThreshold && this.config.respawnThresholdTime) {
const timeSinceLastRestart = now - lastRestartTime
if (
timeSinceLastRestart < this.config.respawnThresholdTime &&
restarts >= this.config.respawnThreshold
) {
return false
}
// Enforce respawn threshold within time window using sliding window timestamps
const threshold = this.config.respawnThreshold ?? 5
const windowMs = this.config.respawnThresholdTime ?? 60000
// Prune old timestamps
workerInfo.restartTimestamps = workerInfo.restartTimestamps.filter(
(t) => now - t <= windowMs,
)
if (workerInfo.restartTimestamps.length >= threshold) {
return false
}

return true
}

private async restartAllWorkers(): Promise<void> {
this.logger?.info('Restarting all workers')
this.logger?.info('Rolling restart of all workers initiated')

const workerIds = Array.from(this.workers.keys())

for (const workerId of workerIds) {
const workerInfo = this.workers.get(workerId)
if (workerInfo) {
this.logger?.info(`Restarting worker ${workerId}`)
workerInfo.isExiting = true
workerInfo.process.kill('SIGTERM')

// Wait a bit before starting the next restart
await new Promise((resolve) => setTimeout(resolve, 1000))
}
const current = this.workers.get(workerId)
if (!current) continue
// Spawn a replacement first (uses a new worker id) to minimize downtime
await this.spawnWorker()
// Give the new worker a brief moment to initialize
await new Promise((resolve) => setTimeout(resolve, 250))
this.logger?.info(`Stopping old worker ${workerId} (rolling restart)`)
current.isExiting = true
current.process.kill('SIGTERM')
// Small spacing to avoid thundering herd
await new Promise((resolve) => setTimeout(resolve, 250))
}
}

Expand Down Expand Up @@ -291,7 +332,13 @@ export class ClusterManager {
await Promise.race([shutdownPromise, timeoutPromise])

this.logger?.info('Cluster shutdown complete')
process.exit(0)
// Remove signal handlers to avoid memory leaks when embedding in long-running processes/tests
if (this.boundSigint) process.off('SIGINT', this.boundSigint)
if (this.boundSigterm) process.off('SIGTERM', this.boundSigterm)
if (this.boundSigusr2) process.off('SIGUSR2', this.boundSigusr2)
if (this.config.exitOnShutdown ?? true) {
process.exit(0)
}
}

getWorkerCount(): number {
Expand All @@ -301,4 +348,80 @@ export class ClusterManager {
getWorkerInfo(): WorkerInfo[] {
return Array.from(this.workers.values())
}

/** Whether the cluster has been started and not shut down */
isRunning(): boolean {
return this.started && !this.isShuttingDown
}

/**
* Dynamically scale the number of workers.
* Increases spawns or gracefully stops excess workers to match target.
*/
async scaleTo(target: number): Promise<void> {
if (!this.config.enabled) return
const desired = Math.max(1, Math.floor(target))
const current = this.workers.size
if (desired === current) return
if (desired > current) {
const toAdd = desired - current
this.logger?.info(`Scaling up workers: +${toAdd}`)
for (let i = 0; i < toAdd; i++) {
await this.spawnWorker()
}
} else {
const toRemove = current - desired
this.logger?.info(`Scaling down workers: -${toRemove}`)
const ids = Array.from(this.workers.keys()).slice(0, toRemove)
for (const id of ids) {
const info = this.workers.get(id)
if (!info) continue
info.isExiting = true
info.process.kill('SIGTERM')
}
}
this.config.workers = desired
}

/** Convenience: increase workers by N (default 1) */
scaleUp(by = 1): Promise<void> {
return this.scaleTo((this.workers.size || 0) + Math.max(1, by))
}

/** Convenience: decrease workers by N (default 1) */
scaleDown(by = 1): Promise<void> {
return this.scaleTo(Math.max(1, this.workers.size - Math.max(1, by)))
}

/** Broadcast a POSIX signal to all workers (e.g., 'SIGTERM', 'SIGHUP'). */
broadcastSignal(signal: NodeJS.Signals = 'SIGHUP'): void {
for (const [id, info] of this.workers) {
this.logger?.debug(`Sending ${signal} to worker ${id}`)
try {
info.process.kill(signal)
} catch (err) {
this.logger?.warn(
`Failed sending ${signal} to worker ${id}: ${(err as Error).message}`,
)
}
}
}

/** Send a signal to a single worker by id. */
sendSignalToWorker(
workerId: number,
signal: NodeJS.Signals = 'SIGHUP',
): boolean {
const info = this.workers.get(workerId)
if (!info) return false
try {
info.process.kill(signal)
return true
} catch (err) {
this.logger?.warn(
`Failed sending ${signal} to worker ${workerId}: ${(err as Error).message}`,
)
return false
}
}
}
Loading