Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/noslated/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message FunctionProfileWorker {
float precisionZeroThreshold = 21;
string concurrencyStatsMode = 22;
bool shrinkCooldownOnStartup = 23;
bool resetShrinkCooldownOnRequestActive = 24;
}

message PlaneHealthyResponse {
Expand Down
1 change: 1 addition & 0 deletions src/config/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export default {
precisionZeroThreshold: 0.01,
concurrencyStatsMode: ConcurrencyStatsMode.INSTANT,
shrinkCooldownOnStartup: true,
resetShrinkCooldownOnRequestActive: false,
},
starter: {
aworker: {
Expand Down
2 changes: 2 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ export interface WorkerDefaultConfig {
concurrencyStatsMode?: ConcurrencyStatsMode;
// 启动后是否进入缩容冷却期,默认为 true
shrinkCooldownOnStartup?: boolean;
// 请求激活后是否重置缩容冷却期,默认为 false
resetShrinkCooldownOnRequestActive?: boolean;
}

export interface StarterConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import * as common from '#self/test/common';
import { baselineDir } from '#self/test/common';
import assert from 'assert';
import { bufferFromStream, sleep } from '#self/lib/util';
import { DefaultEnvironment } from '#self/test/env/environment';
import { ConcurrencyStatsMode } from '#self/lib/json/function_profile';

describe(common.testName(__filename), function () {
// Debug version of Node.js may take longer time to bootstrap.
this.timeout(30_000);

const env = new DefaultEnvironment({
createTestClock: true,
config: common.extendDefaultConfig({
virtualMemoryPoolSize: '4gb',
controlPlane: {
useEmaScaling: true,
workerTrafficStatsPullingMs: 1000,
},
}),
});

it('should reset shrink down when resetShrinkCooldownOnRequestActive=true', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo_ema',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'index.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
resetShrinkCooldownOnRequestActive: true,
shrinkCooldown: 5_000,
concurrencyStatsMode: ConcurrencyStatsMode.PERIODIC_AVG,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

const statManager = env.control._ctx.getInstance('stateManager');

await request('aworker_echo_ema', env);

await sleep(1000);

await request('aworker_echo_ema', env);

await sleep(5000);

const brokerBefore = statManager.getBroker('aworker_echo_ema', false);

assert(brokerBefore?.workerCount === 1);

await sleep(5000);

const brokerAfter = statManager.getBroker('aworker_echo_ema', false);
assert(brokerAfter == null);
});

it('should reset shrink down when resetShrinkCooldownOnRequestActive=true and shrinkCooldownOnStartup=false', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo_ema',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'index.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
resetShrinkCooldownOnRequestActive: true,
shrinkCooldownOnStartup: false,
shrinkCooldown: 5_000,
concurrencyStatsMode: ConcurrencyStatsMode.PERIODIC_AVG,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

const statManager = env.control._ctx.getInstance('stateManager');

await request('aworker_echo_ema', env);

await sleep(1000);

await request('aworker_echo_ema', env);

await sleep(5000);

const brokerBefore = statManager.getBroker('aworker_echo_ema', false);

assert(brokerBefore?.workerCount === 1);

await sleep(5000);

const brokerAfter = statManager.getBroker('aworker_echo_ema', false);
assert(brokerAfter == null);
});
});

async function request(functionName: string, env: DefaultEnvironment) {
const data = Buffer.from('200');

const response = await env.agent.invoke(functionName, data, {
method: 'POST',
});

assert.strictEqual(response.status, 200);

return await bufferFromStream(response);
}
8 changes: 8 additions & 0 deletions src/control_plane/worker_stats/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class Broker {
return this.#profile.worker.shrinkCooldownOnStartup ?? true;
}

get resetShrinkCooldownOnRequestActive() {
return this.#profile.worker.resetShrinkCooldownOnRequestActive ?? false;
}

isExpandCooldown(now: number) {
return now - this._lastExpandTime < this.expandCooldown;
}
Expand Down Expand Up @@ -213,6 +217,10 @@ class Broker {
recalculateConcurrency(concurrency: number) {
if (!this._useEmaScaling) return;

if (concurrency > 0 && this.resetShrinkCooldownOnRequestActive) {
this.resetShrinkCooldownTime(Date.now());
}

this._emaConcurrency!.recalculate(concurrency);
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib/json/function_profile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ export interface ProcessFunctionProfile {
concurrencyStatsMode?: ConcurrencyStatsMode;
// 启动后是否进入缩容冷却期,默认为 true
shrinkCooldownOnStartup?: boolean;
// 请求激活后是否重置缩容冷却期,默认为 false
resetShrinkCooldownOnRequestActive?: boolean;
};
environments?: {
key: string;
Expand Down
4 changes: 4 additions & 0 deletions src/lib/json/function_profile_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@
"shrinkCooldownOnStartup": {
"type": "boolean",
"description": "whether shrink cooldown on worker startup"
},
"resetShrinkCooldownOnRequestActive": {
"type": "boolean",
"description": "whether reset shrink cooldown on request active"
}
},
"additionalProperties": true
Expand Down