From 617da5b4047e1ce62fbf26a5ace7acc3942575b8 Mon Sep 17 00:00:00 2001 From: Hongcai Deng Date: Thu, 28 Sep 2023 18:06:06 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20=F0=9F=90=9B=20should-record-active-?= =?UTF-8?q?request-in-a-duration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- proto/noslated/control-plane.proto | 1 + src/control_plane/capacity_manager.ts | 6 ++++++ src/control_plane/worker_stats/broker.ts | 10 ++++++++++ src/control_plane/worker_stats/worker.ts | 2 ++ src/data_plane/worker_broker.ts | 21 +++++++++++++++++---- 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/proto/noslated/control-plane.proto b/proto/noslated/control-plane.proto index 460b051..9886b53 100644 --- a/proto/noslated/control-plane.proto +++ b/proto/noslated/control-plane.proto @@ -13,6 +13,7 @@ service ControlPlane { message WorkerAdditionalData { int32 activeRequestCount = 2; bool trafficOff = 3; + int32 accumulatedRequestCount = 4; } message GetFunctionProfileResponse { diff --git a/src/control_plane/capacity_manager.ts b/src/control_plane/capacity_manager.ts index d9cbd58..dc16baf 100644 --- a/src/control_plane/capacity_manager.ts +++ b/src/control_plane/capacity_manager.ts @@ -232,6 +232,12 @@ export class CapacityManager extends Base { ) { deltaInstance = broker.activeWorkerCount - broker.reservationCount; } + + // 如果周期内有请求,则不扩容到 0 + if (broker.getAccumulatedRequestCount() > 0 && broker.activeWorkerCount - deltaInstance === 0) { + broker.redundantTimes = 0; + return 0; + } broker.redundantTimes = 0; return -deltaInstance; diff --git a/src/control_plane/worker_stats/broker.ts b/src/control_plane/worker_stats/broker.ts index 75fcf93..d6a97f8 100644 --- a/src/control_plane/worker_stats/broker.ts +++ b/src/control_plane/worker_stats/broker.ts @@ -134,6 +134,16 @@ class Broker { return a; } + getAccumulatedRequestCount() { + let a = 0; + for (const worker of this.workers.values()) { + if (!worker.isActive()) continue; + a += worker.data?.accumulatedRequestCount || 0; + } + + return a; + } + /** * Get worker. * @param processName The process name (worker name). diff --git a/src/control_plane/worker_stats/worker.ts b/src/control_plane/worker_stats/worker.ts index 2937d73..ba2bfbf 100644 --- a/src/control_plane/worker_stats/worker.ts +++ b/src/control_plane/worker_stats/worker.ts @@ -15,9 +15,11 @@ export type WorkerStats = noslated.data.IWorkerStats; class WorkerAdditionalData { activeRequestCount; + accumulatedRequestCount; constructor(data: WorkerStats) { this.activeRequestCount = data.activeRequestCount; + this.accumulatedRequestCount = data.accumulatedRequestCount; } } diff --git a/src/data_plane/worker_broker.ts b/src/data_plane/worker_broker.ts index 978a658..6f520bc 100644 --- a/src/data_plane/worker_broker.ts +++ b/src/data_plane/worker_broker.ts @@ -89,6 +89,7 @@ export class PendingRequest extends EventEmitter { export class Worker extends EventEmitter { activeRequestCount: number; + accumulatedRequestCount: number; private logger: PrefixedLogger; trafficOff: boolean; @@ -105,6 +106,7 @@ export class Worker extends EventEmitter { ) { super(); this.activeRequestCount = 0; + this.accumulatedRequestCount = 0; this.logger = new PrefixedLogger('worker', this.name); // + if `trafficOff` is `false`, then traffic may in; @@ -140,6 +142,10 @@ export class Worker extends EventEmitter { return promise; } + zeroAccumulatedRequestCount() { + this.accumulatedRequestCount = 0; + } + /** * Pipe input stream to worker process and get response. */ @@ -172,6 +178,7 @@ export class Worker extends EventEmitter { } this.activeRequestCount++; + this.accumulatedRequestCount++; this.logger.info( '[%s] Dispatching request, activeRequestCount: %s, wait: %sms.', requestId, @@ -473,10 +480,16 @@ export class WorkerBroker extends Base implements DispatcherDelegate { return { functionName: this.name, inspector: this.options.inspect === true, - workers: Array.from(this._workerMap.values()).map(item => ({ - name: item.name, - activeRequestCount: item.worker?.activeRequestCount ?? 0, - })), + workers: Array.from(this._workerMap.values()).map(item => { + const data = { + name: item.name, + activeRequestCount: item.worker?.activeRequestCount ?? 0, + accumulatedRequestCount: item.worker?.accumulatedRequestCount ?? 0, + }; + // 读出累计数据后直接清空 + item.worker?.zeroAccumulatedRequestCount(); + return data; + }), }; } From 81db03969406a71b58555f08d0e605d2065b0b56 Mon Sep 17 00:00:00 2001 From: Hongcai Deng Date: Thu, 28 Sep 2023 18:21:05 +0800 Subject: [PATCH 2/2] fff --- src/control_plane/capacity_manager.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/control_plane/capacity_manager.ts b/src/control_plane/capacity_manager.ts index dc16baf..d8f6ed5 100644 --- a/src/control_plane/capacity_manager.ts +++ b/src/control_plane/capacity_manager.ts @@ -233,10 +233,14 @@ export class CapacityManager extends Base { deltaInstance = broker.activeWorkerCount - broker.reservationCount; } - // 如果周期内有请求,则不扩容到 0 + // 如果周期内有请求,则不缩容到 0,改为缩容到 1 if (broker.getAccumulatedRequestCount() > 0 && broker.activeWorkerCount - deltaInstance === 0) { broker.redundantTimes = 0; - return 0; + if (broker.activeWorkerCount > 1) { + return 1 - broker.activeWorkerCount; + } else { + return 0; + } } broker.redundantTimes = 0;