diff --git a/proto/noslated/control-plane.proto b/proto/noslated/control-plane.proto index 460b051..9886b53 100644 --- a/proto/noslated/control-plane.proto +++ b/proto/noslated/control-plane.proto @@ -13,6 +13,7 @@ service ControlPlane { message WorkerAdditionalData { int32 activeRequestCount = 2; bool trafficOff = 3; + int32 accumulatedRequestCount = 4; } message GetFunctionProfileResponse { diff --git a/src/control_plane/capacity_manager.ts b/src/control_plane/capacity_manager.ts index d9cbd58..d8f6ed5 100644 --- a/src/control_plane/capacity_manager.ts +++ b/src/control_plane/capacity_manager.ts @@ -232,6 +232,16 @@ export class CapacityManager extends Base { ) { deltaInstance = broker.activeWorkerCount - broker.reservationCount; } + + // 如果周期内有请求,则不缩容到 0,改为缩容到 1 + if (broker.getAccumulatedRequestCount() > 0 && broker.activeWorkerCount - deltaInstance === 0) { + broker.redundantTimes = 0; + if (broker.activeWorkerCount > 1) { + return 1 - broker.activeWorkerCount; + } else { + return 0; + } + } broker.redundantTimes = 0; return -deltaInstance; diff --git a/src/control_plane/worker_stats/broker.ts b/src/control_plane/worker_stats/broker.ts index 75fcf93..d6a97f8 100644 --- a/src/control_plane/worker_stats/broker.ts +++ b/src/control_plane/worker_stats/broker.ts @@ -134,6 +134,16 @@ class Broker { return a; } + getAccumulatedRequestCount() { + let a = 0; + for (const worker of this.workers.values()) { + if (!worker.isActive()) continue; + a += worker.data?.accumulatedRequestCount || 0; + } + + return a; + } + /** * Get worker. * @param processName The process name (worker name). diff --git a/src/control_plane/worker_stats/worker.ts b/src/control_plane/worker_stats/worker.ts index 2937d73..ba2bfbf 100644 --- a/src/control_plane/worker_stats/worker.ts +++ b/src/control_plane/worker_stats/worker.ts @@ -15,9 +15,11 @@ export type WorkerStats = noslated.data.IWorkerStats; class WorkerAdditionalData { activeRequestCount; + accumulatedRequestCount; constructor(data: WorkerStats) { this.activeRequestCount = data.activeRequestCount; + this.accumulatedRequestCount = data.accumulatedRequestCount; } } diff --git a/src/data_plane/worker_broker.ts b/src/data_plane/worker_broker.ts index 978a658..6f520bc 100644 --- a/src/data_plane/worker_broker.ts +++ b/src/data_plane/worker_broker.ts @@ -89,6 +89,7 @@ export class PendingRequest extends EventEmitter { export class Worker extends EventEmitter { activeRequestCount: number; + accumulatedRequestCount: number; private logger: PrefixedLogger; trafficOff: boolean; @@ -105,6 +106,7 @@ export class Worker extends EventEmitter { ) { super(); this.activeRequestCount = 0; + this.accumulatedRequestCount = 0; this.logger = new PrefixedLogger('worker', this.name); // + if `trafficOff` is `false`, then traffic may in; @@ -140,6 +142,10 @@ export class Worker extends EventEmitter { return promise; } + zeroAccumulatedRequestCount() { + this.accumulatedRequestCount = 0; + } + /** * Pipe input stream to worker process and get response. */ @@ -172,6 +178,7 @@ export class Worker extends EventEmitter { } this.activeRequestCount++; + this.accumulatedRequestCount++; this.logger.info( '[%s] Dispatching request, activeRequestCount: %s, wait: %sms.', requestId, @@ -473,10 +480,16 @@ export class WorkerBroker extends Base implements DispatcherDelegate { return { functionName: this.name, inspector: this.options.inspect === true, - workers: Array.from(this._workerMap.values()).map(item => ({ - name: item.name, - activeRequestCount: item.worker?.activeRequestCount ?? 0, - })), + workers: Array.from(this._workerMap.values()).map(item => { + const data = { + name: item.name, + activeRequestCount: item.worker?.activeRequestCount ?? 0, + accumulatedRequestCount: item.worker?.accumulatedRequestCount ?? 0, + }; + // 读出累计数据后直接清空 + item.worker?.zeroAccumulatedRequestCount(); + return data; + }), }; }