Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions benchmark/delegate/_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ function createDelegate(filename, options, callback) {
return main;

function main(setup) {
require('#self/lib/logger').setSink(
require('#self/lib/logger').getPrettySink('benchmark.log')
);
const { LoggerFactory } = require('#self/lib/logger_factory');
LoggerFactory.init('benchmark.log');

const {
NoslatedDelegateService: DelegateService,
} = require('#self/delegate/index');
Expand Down Expand Up @@ -207,6 +207,7 @@ function createDelegate(filename, options, callback) {
}

function end() {
LoggerFactory.close();
delegate.resetPeer(credentials);
cp.removeAllListeners('exit');
cp.kill();
Expand Down
6 changes: 4 additions & 2 deletions bin/control_plane
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@

'use strict';

const Logger = require('#self/lib/logger');
const { LoggerFactory } = require('#self/lib/logger_factory');
const { ControlPlane } = require('#self/control_plane/index');

(async function() {
Logger.setSink(Logger.getPrettySink('control_plane.log'));
LoggerFactory.init('control_plane.log');

const controlPlane = new ControlPlane();
await controlPlane.ready();

const onSignal = async () => {
await controlPlane.close();
LoggerFactory.close();
process.exit(0);
};

process.on('SIGINT', onSignal);
process.on('SIGTERM', onSignal);
})().catch(err => {
console.error(err);
LoggerFactory.close();
process.exit(1);
});
6 changes: 4 additions & 2 deletions bin/data_plane
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@

'use strict';

const Logger = require('#self/lib/logger');
const { LoggerFactory } = require('#self/lib/logger_factory');
const { DataPlane } = require('#self/data_plane/index');

(async function() {
Logger.setSink(Logger.getPrettySink('data_plane.log'));
LoggerFactory.init('data_plane.log');
const dataPlane = new DataPlane();
await dataPlane.ready();

const onSignal = async () => {
await dataPlane.close();
LoggerFactory.close();
process.exit(0);
};

process.on('SIGINT', onSignal);
process.on('SIGTERM', onSignal);
})().catch(err => {
console.error(err);
LoggerFactory.close();
process.exit(1);
});
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"@datastructures-js/priority-queue": "6.1.1",
"@grpc/grpc-js": "1.8.8",
"@grpc/proto-loader": "^0.5.6",
"@midwayjs/logger": "^2.16.3",
"@midwayjs/logger": "^3.1.4",
"@opentelemetry/api": "^1.3.0",
"arg": "^5.0.0",
"bytes": "^3.1.0",
Expand Down
42 changes: 3 additions & 39 deletions src/control_plane/__test__/e2e/ema_scaling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { DefaultEnvironment } from '#self/test/env/environment';

/**
* 仅观测行为
* 在内部 ci 时因配置问题,容易触发 load 系统保护
*/
describe(common.testName(__filename), function () {
// Debug version of Node.js may take longer time to bootstrap.
Expand Down Expand Up @@ -40,7 +41,7 @@ describe(common.testName(__filename), function () {
},
]);

const sequence = [1, 1, 1, 10, 1, 6, 5, 0, 0, 0];
const sequence = [1, 1, 1, 5, 1, 0, 0, 0];

for (const concurrency of sequence) {
await makeConcurrencyRequest('aworker_echo_ema', concurrency, env);
Expand All @@ -52,7 +53,7 @@ describe(common.testName(__filename), function () {
});

async function request(functionName: string, env: DefaultEnvironment) {
const data = Buffer.from('1000');
const data = Buffer.from('100');

const response = await env.agent.invoke(functionName, data, {
method: 'POST',
Expand All @@ -74,40 +75,3 @@ function makeConcurrencyRequest(

return Promise.all(requests);
}

const WEIGHTS = {
0: 10,
1: 10,
2: 8,
3: 8,
4: 7,
5: 6,
6: 5,
7: 4,
8: 3,
9: 2,
10: 1,
};

function generateWeightedSequence(
length: number,
weights: Record<number, number>
): number[] {
const sequence = [];
const weightedValues = [];

// 构造根据权重扩展的值数组
for (const [value, weight] of Object.entries(weights)) {
for (let i = 0; i < weight; i++) {
weightedValues.push(parseInt(value));
}
}

// 生成序列
while (sequence.length < length) {
const randomIndex = Math.floor(Math.random() * weightedValues.length);
sequence.push(weightedValues[randomIndex]);
}

return sequence;
}
7 changes: 3 additions & 4 deletions src/control_plane/capacity_manager.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import bytes from 'bytes';
import { Base } from '#self/lib/sdk_base';
import loggers from '#self/lib/logger';
import { Logger } from '#self/lib/loggers';
import { Broker } from './worker_stats/broker';
import { RequestQueueingEvent } from './events';
import { ControlPlaneDependencyContext } from './deps';
import { StateManager } from './worker_stats/state_manager';
import { kMemoryLimit } from './constants';
import { RawWithDefaultsFunctionProfile } from '#self/lib/json/function_profile';
import { ErrorCode, LauncherError } from './worker_launcher_error_code';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

enum WaterLevelAction {
UNKNOWN = 0,
Expand All @@ -24,7 +23,7 @@ export class CapacityManager extends Base {
private _shrinkRedundantTimes: number;
private _scalingStage: number;
private virtualMemoryPoolSize: number;
private logger: Logger;
private logger: PrefixedLogger;
private stateManager: StateManager;
private _useEmaScaling: boolean;

Expand All @@ -36,7 +35,7 @@ export class CapacityManager extends Base {
this._shrinkRedundantTimes =
config.controlPlane.workerRedundantVictimSpareTimes;
this._scalingStage = config.controlPlane.capacityScalingStage;
this.logger = loggers.get('capacity manager');
this.logger = LoggerFactory.prefix('capacity manager');
this._useEmaScaling = config.controlPlane.useEmaScaling;
}

Expand Down
6 changes: 3 additions & 3 deletions src/control_plane/code_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { promises as fs } from 'fs';
import path from 'path';
import util from 'util';

import loggers from '#self/lib/logger';
import * as naming from '#self/lib/naming';
import * as utils from '#self/lib/util';
import { ConfigContext } from './deps';
import { DependencyContext } from '#self/lib/dependency_context';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

async function exists(filepath: string) {
let exists = true;
Expand All @@ -20,14 +20,14 @@ async function exists(filepath: string) {
}

export class CodeManager extends EventEmitter {
logger;
logger: PrefixedLogger;
map: Map<string, Promise<string> | boolean>;
workDir: string;

constructor(ctx: DependencyContext<ConfigContext>) {
super();
this.workDir = ctx.getInstance('config').dirs.noslatedWork;
this.logger = loggers.get('code manager');
this.logger = LoggerFactory.prefix('code manager');

this.map = new Map();
}
Expand Down
6 changes: 3 additions & 3 deletions src/control_plane/container/reconciler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Config } from '#self/config';
import { Clock } from '#self/lib/clock';
import { DependencyContext } from '#self/lib/dependency_context';
import { EventBus } from '#self/lib/event-bus';
import { Logger, loggers } from '#self/lib/loggers';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';
import { createDeferred } from '#self/lib/util';
import { ContainerReconciledEvent } from '../events';
import { ContainerManager } from './container_manager';
Expand All @@ -19,7 +19,7 @@ export class ContainerReconciler {
private _clock: Clock;
private _containerManager: ContainerManager;
private _eventBus: EventBus;
private _logger: Logger;
private _logger: PrefixedLogger;
private _interval: unknown | null = null;
private _closed = false;

Expand All @@ -31,7 +31,7 @@ export class ContainerReconciler {
ctx.getInstance('config').turf.reconcilingInterval;
this._containerManager = ctx.getInstance('containerManager');
this._eventBus = ctx.getInstance('eventBus');
this._logger = loggers.get('container reconciler');
this._logger = LoggerFactory.prefix('container reconciler');
}

ready() {
Expand Down
8 changes: 4 additions & 4 deletions src/control_plane/container/turf_container_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import path from 'path';
import { Config } from '#self/config';
import { Logger, loggers } from '#self/lib/loggers';
import { createDeferred, Deferred, sleep } from '#self/lib/util';
import {
Container,
Expand All @@ -20,12 +19,13 @@ import {
import { DependencyContext } from '#self/lib/dependency_context';
import { ConfigContext } from '../deps';
import { TaskQueue } from '#self/lib/task_queue';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

const TurfStopRetryableCodes = [TurfCode.EAGAIN];

export class TurfContainerManager implements ContainerManager {
private config: Config;
private logger: Logger;
private logger: PrefixedLogger;
private containers = new Map<string, TurfContainer>();
_cleanupQueue: TaskQueue<TurfContainer>;

Expand All @@ -34,7 +34,7 @@ export class TurfContainerManager implements ContainerManager {
constructor(ctx: DependencyContext<ConfigContext>) {
this.config = ctx.getInstance('config');
this.client = new Turf(this.config.turf.bin, this.config.turf.socketPath);
this.logger = loggers.get('turf-manager');
this.logger = LoggerFactory.prefix('turf-manager');

this._cleanupQueue = new TaskQueue(this._cleanup, {
concurrency: 1,
Expand Down Expand Up @@ -124,7 +124,7 @@ export class TurfContainerManager implements ContainerManager {

export class TurfContainer implements Container {
private client: Turf;
private logger: Logger;
private logger: PrefixedLogger;
pid?: number;
status: TurfContainerStates;
onstatuschanged = () => {};
Expand Down
7 changes: 3 additions & 4 deletions src/control_plane/control_plane.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { BaseOf } from '#self/lib/sdk_base';
import { dumpConfig } from '#self/config';
import loggers from '#self/lib/logger';
import { Logger } from '#self/lib/loggers';
import { EventEmitter } from 'events';
import { WorkerTelemetry } from './telemetry';
import { getMeter } from '#self/lib/telemetry/otel';
Expand All @@ -14,6 +12,7 @@ import { EventBus } from '#self/lib/event-bus';
import { StateManager } from './worker_stats/state_manager';
import { CodeManager } from './code_manager';
import { FunctionProfileUpdateEvent } from '#self/lib/function_profile';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

/**
* ControlPlane
Expand All @@ -26,7 +25,7 @@ export class ControlPlane extends BaseOf(EventEmitter) {
public _ctx: ControlPlaneDependencyContext;

private _meter: Meter;
private _logger: Logger;
private _logger: PrefixedLogger;
private _workerTelemetry: WorkerTelemetry;
private _eventBus: EventBus;
private _stateManager: StateManager;
Expand All @@ -50,7 +49,7 @@ export class ControlPlane extends BaseOf(EventEmitter) {
this._eventBus
);

this._logger = loggers.get('control plane');
this._logger = LoggerFactory.prefix('control plane');

this._ctx
.getInstance('dataPlaneClientManager')
Expand Down
4 changes: 2 additions & 2 deletions src/control_plane/controllers/base_controller.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { ControlPlaneEvent } from '#self/lib/constants';
import { Logger } from '#self/lib/loggers';
import { Delta } from '../capacity_manager';
import { WorkerMetadata } from '../worker_stats/worker';
import { FunctionProfileManager } from '#self/lib/function_profile';
import { ContainerManager } from '../container/container_manager';
import { ControlPlaneDependencyContext } from '../deps';
import { WorkerLauncher } from '../worker_launcher';
import { StateManager } from '../worker_stats/state_manager';
import { PrefixedLogger } from '#self/lib/logger_factory';

export abstract class BaseController {
protected abstract logger: Logger;
protected abstract logger: PrefixedLogger;
protected _functionProfile: FunctionProfileManager;
protected _workerLauncher: WorkerLauncher;
protected _containerManager: ContainerManager;
Expand Down
7 changes: 3 additions & 4 deletions src/control_plane/controllers/default_controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import _ from 'lodash';
import loggers from '#self/lib/logger';
import { Logger } from '#self/lib/loggers';
import { ControlPlaneEvent } from '#self/lib/constants';
import { CapacityManager, Delta } from '../capacity_manager';
import {
Expand All @@ -19,9 +17,10 @@ import { Worker, WorkerMetadata } from '../worker_stats/worker';
import { ControlPlaneDependencyContext } from '../deps';
import { DataPlaneClientManager } from '../data_plane_client/manager';
import { ReservationController } from './reservation_controller';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

export class DefaultController extends BaseController {
protected logger: Logger;
protected logger: PrefixedLogger;

private shrinking: boolean;
private _capacityManager: CapacityManager;
Expand All @@ -34,7 +33,7 @@ export class DefaultController extends BaseController {
this._reservationController = ctx.getInstance('reservationController');
this._dataPlaneClientManager = ctx.getInstance('dataPlaneClientManager');

this.logger = loggers.get('default controller');
this.logger = LoggerFactory.prefix('default controller');
this.shrinking = false;

const eventBus = ctx.getInstance('eventBus');
Expand Down
6 changes: 3 additions & 3 deletions src/control_plane/controllers/disposable_controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Logger, loggers } from '#self/lib/loggers';
import { WorkerStatusReportEvent } from '../events';
import { BaseController } from './base_controller';
import { ControlPlaneDependencyContext } from '../deps';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

export class DisposableController extends BaseController {
logger: Logger;
protected logger: PrefixedLogger;

constructor(ctx: ControlPlaneDependencyContext) {
super(ctx);
this.logger = loggers.get('disposable controller');
this.logger = LoggerFactory.prefix('disposable controller');

const eventBus = ctx.getInstance('eventBus');
eventBus.subscribe(WorkerStatusReportEvent, {
Expand Down
6 changes: 3 additions & 3 deletions src/control_plane/controllers/reservation_controller.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Logger, loggers } from '#self/lib/loggers';
import { Delta } from '../capacity_manager';
import { BaseController } from './base_controller';
import { ControlPlaneDependencyContext } from '../deps';
import { LoggerFactory, PrefixedLogger } from '#self/lib/logger_factory';

export class ReservationController extends BaseController {
logger: Logger;
logger: PrefixedLogger;

constructor(ctx: ControlPlaneDependencyContext) {
super(ctx);
this.logger = loggers.get('reservation controller');
this.logger = LoggerFactory.prefix('reservation controller');
}

expand(deltas: Delta[]): Promise<void> {
Expand Down
Loading