diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json new file mode 100644 index 00000000000..12fe1d7e9c5 --- /dev/null +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json new file mode 100644 index 00000000000..6b24dd9f761 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency. Change its default to `false`.", + "type": "minor" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index a0fc83dccdc..b5d7827f474 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -237,6 +237,7 @@ export type FolderItem = nodeFs.Dirent; // @public export interface IAsyncParallelismOptions { + allowOversubscription?: boolean; concurrency?: number; weighted?: boolean; } diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 85d0b0fcdb0..9fb6bb9fd08 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -19,10 +19,28 @@ export interface IAsyncParallelismOptions { concurrency?: number; /** - * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can - * take up more or less than one concurrency unit. + * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an + * operation can take up more or less than one concurrency unit. */ weighted?: boolean; + + /** + * This option affects the handling of task weights, applying a softer policy that favors maximizing parallelism + * instead of avoiding overload. + * + * @remarks + * By default, a new task cannot start executing if doing so would push the total weight above the concurrency limit. + * Set `allowOversubscription` to true to relax this rule, allowing a new task to start as long as the current + * total weight is below the concurrency limit. Either way, a task cannot start if the total weight already equals + * the concurrency limit; therefore, `allowOversubscription` has no effect when all tasks have weight 1. + * + * Example: Suppose the concurrency limit is 8, and seven tasks are running whose weights are 1, so the current + * total weight is 7. If an available task has weight 2, that would push the total weight to 9, exceeding + * the limit. This task can start only if `allowOversubscription` is true. + * + * @defaultValue false + */ + allowOversubscription?: boolean; } /** @@ -201,6 +219,8 @@ export class Async { let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; + // iterator that is stored when the loop exits early due to not enough concurrency + let nextIterator: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -213,7 +233,7 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = await iterator.next(); + const currentIteratorResult: IteratorResult = nextIterator ?? (await iterator.next()); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; @@ -225,9 +245,21 @@ export class Async { // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. - concurrentUnitsInProgress += weight; concurrentUnitsInProgress -= limitedConcurrency; + // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` + const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; + const allowOversubscription: boolean = options?.allowOversubscription ?? false; + if (!allowOversubscription && wouldExceedConcurrency) { + // eslint-disable-next-line require-atomic-updates + nextIterator = currentIteratorResult; + break; + } + + // eslint-disable-next-line require-atomic-updates + nextIterator = undefined; + concurrentUnitsInProgress += weight; + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { // Remove the operation completely from the in progress units. @@ -306,6 +338,7 @@ export class Async { * number of concurrency units that can be in progress at once. The weight of each operation * determines how many concurrency units it takes up. For example, if the concurrency is 2 * and the first operation has a weight of 2, then only one more operation can be in progress. + * Operations may exceed the concurrency limit based on the `allowOversubscription` option. * * If `callback` throws a synchronous exception, or if it returns a promise that rejects, * then the loop stops immediately. Any remaining array items will be skipped, and diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 5442723e8b2..c7c5468bc9d 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -3,6 +3,11 @@ import { Async, AsyncQueue } from '../Async'; +interface INumberWithWeight { + n: number; + weight: number; +} + describe(Async.name, () => { describe(Async.mapAsync.name, () => { it('handles an empty array correctly', async () => { @@ -27,13 +32,6 @@ describe(Async.name, () => { expect(fn).toHaveBeenNthCalledWith(3, 3, 2); }); - it('returns the same result as built-in Promise.all', async () => { - const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; - const fn: (item: number) => Promise = async (item) => `result ${item}`; - - expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn))); - }); - it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { let running: number = 0; let maxRunning: number = 0; @@ -61,6 +59,31 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); + it('respects concurrency limit with allowOversubscription=false in mapAsync', async () => { + const array: INumberWithWeight[] = [ + { n: 1, weight: 2 }, + { n: 2, weight: 2 } + ]; + + let running = 0; + let maxRunning = 0; + + const result = await Async.mapAsync( + array, + async (item) => { + running++; + maxRunning = Math.max(maxRunning, running); + await Async.sleepAsync(0); + running--; + return `result-${item.n}`; + }, + { concurrency: 3, weighted: true, allowOversubscription: false } + ); + + expect(result).toEqual(['result-1', 'result-2']); + expect(maxRunning).toEqual(1); + }); + it('rejects if a sync iterator throws an error', async () => { const expectedError: Error = new Error('iterator error'); let iteratorIndex: number = 0; @@ -314,11 +337,6 @@ describe(Async.name, () => { ).rejects.toThrow(expectedError); }); - interface INumberWithWeight { - n: number; - weight: number; - } - it('handles an empty array correctly', async () => { let running: number = 0; let maxRunning: number = 0; @@ -469,7 +487,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(2); }); @@ -542,6 +560,10 @@ describe(Async.name, () => { }); describe(Async.runWithRetriesAsync.name, () => { + afterEach(() => { + jest.restoreAllMocks(); + }); + it('Correctly handles a sync function that succeeds the first time', async () => { const expectedResult: string = 'RESULT'; const result: string = await Async.runWithRetriesAsync({ action: () => expectedResult, maxRetries: 0 }); @@ -688,6 +710,142 @@ describe(Async.name, () => { expect(sleepSpy).toHaveBeenCalledTimes(1); expect(sleepSpy).toHaveBeenLastCalledWith(5); }); + + describe('allowOversubscription=false operations', () => { + it.each([ + { + concurrency: 4, + weight: 4, + expectedConcurrency: 1, + numberOfTasks: 4 + }, + { + concurrency: 4, + weight: 1, + expectedConcurrency: 4, + numberOfTasks: 4 + }, + { + concurrency: 4, + weight: 5, + expectedConcurrency: 1, + numberOfTasks: 2 + } + ])( + 'enforces strict concurrency limits when allowOversubscription=false: concurrency=$concurrency, weight=$weight, expects max $expectedConcurrency concurrent operations', + async ({ concurrency, weight, expectedConcurrency, numberOfTasks }) => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = Array.from({ length: numberOfTasks }, (v, i) => i).map((n) => ({ + n, + weight + })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async () => { + running++; + await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false }); + expect(fn).toHaveBeenCalledTimes(numberOfTasks); + expect(maxRunning).toEqual(expectedConcurrency); + } + ); + + it('waits for a small and large operation to finish before scheduling more', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 10 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 10 }, + { n: 7, weight: 1 }, + { n: 8, weight: 10 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(1); + }); + + it('handles operation with mixed weights', async () => { + const concurrency: number = 3; + let running: number = 0; + let maxRunning: number = 0; + const taskToMaxConcurrency: Record = {}; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 2 }, + { n: 3, weight: concurrency }, + { n: 4, weight: 1 }, + { n: 5, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + taskToMaxConcurrency[item.n] = running; + await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false }); + expect(fn).toHaveBeenCalledTimes(5); + expect(maxRunning).toEqual(2); + + expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 + expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2 + expect(taskToMaxConcurrency[3]).toEqual(1); // task 3 + expect(taskToMaxConcurrency[4]).toEqual(1); // task 4 + expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5 + }); + + it('allows operations with weight 0 to be picked up when system is at max concurrency', async () => { + let running: number = 0; + let maxRunning: number = 0; + const taskToMaxConcurrency: Record = {}; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 0 }, + { n: 3, weight: 3 }, + { n: 4, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + taskToMaxConcurrency[item.n] = running; + maxRunning = Math.max(maxRunning, running); + await Async.sleepAsync(0); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false }); + + expect(fn).toHaveBeenCalledTimes(4); + expect(maxRunning).toEqual(2); + + expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 + expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2 + expect(taskToMaxConcurrency[3]).toEqual(2); // task 2 + 3 + expect(taskToMaxConcurrency[4]).toEqual(1); // task 4 + }); + }); }); }); diff --git a/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json b/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json index 8b972ba7734..3760029c00d 100644 --- a/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json +++ b/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json @@ -77,6 +77,14 @@ */ "enableParallelism": false, + /** + * Controls whether weighted operations can start when the total weight would exceed the limit + * but is currently below the limit. This setting only applies when "enableParallelism" is true + * and operations have a "weight" property configured in their rush-project.json "operationSettings". + * Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit. + */ + "allowOversubscription": false, + /** * Normally projects will be processed according to their dependency order: a given project will not start * processing the command until all of its dependencies have completed. This restriction doesn't apply for diff --git a/libraries/rush-lib/src/api/CommandLineJson.ts b/libraries/rush-lib/src/api/CommandLineJson.ts index ba4f412176f..e6507e49633 100644 --- a/libraries/rush-lib/src/api/CommandLineJson.ts +++ b/libraries/rush-lib/src/api/CommandLineJson.ts @@ -23,6 +23,7 @@ export interface IBaseCommandJson { export interface IBulkCommandJson extends IBaseCommandJson { commandKind: 'bulk'; enableParallelism: boolean; + allowOversubscription?: boolean; ignoreDependencyOrder?: boolean; ignoreMissingScript?: boolean; incremental?: boolean; @@ -38,6 +39,7 @@ export interface IBulkCommandJson extends IBaseCommandJson { export interface IPhasedCommandWithoutPhasesJson extends IBaseCommandJson { commandKind: 'phased'; enableParallelism: boolean; + allowOversubscription?: boolean; incremental?: boolean; } diff --git a/libraries/rush-lib/src/cli/RushCommandLineParser.ts b/libraries/rush-lib/src/cli/RushCommandLineParser.ts index d21b637f931..19e6a095f34 100644 --- a/libraries/rush-lib/src/cli/RushCommandLineParser.ts +++ b/libraries/rush-lib/src/cli/RushCommandLineParser.ts @@ -471,6 +471,11 @@ export class RushCommandLineParser extends CommandLineParser { incremental: command.incremental || false, disableBuildCache: command.disableBuildCache || false, + // The Async.forEachAsync() API defaults allowOversubscription=false, whereas Rush historically + // defaults allowOversubscription=true to favor faster builds rather than strictly staying below + // the CPU limit. + allowOversubscription: command.allowOversubscription ?? true, + initialPhases: command.phases, originalPhases: command.originalPhases, watchPhases: command.watchPhases, diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index 038e2386f31..906a3ca89ea 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -68,6 +68,7 @@ const PERF_PREFIX: 'rush:phasedScriptAction' = 'rush:phasedScriptAction'; */ export interface IPhasedScriptActionOptions extends IBaseScriptActionOptions { enableParallelism: boolean; + allowOversubscription: boolean; incremental: boolean; disableBuildCache: boolean; @@ -140,6 +141,7 @@ export class PhasedScriptAction extends BaseScriptAction i public readonly sessionAbortController: AbortController; private readonly _enableParallelism: boolean; + private readonly _allowOversubscription: boolean; private readonly _isIncrementalBuildAllowed: boolean; private readonly _disableBuildCache: boolean; private readonly _originalPhases: ReadonlySet; @@ -171,6 +173,7 @@ export class PhasedScriptAction extends BaseScriptAction i public constructor(options: IPhasedScriptActionOptions) { super(options); this._enableParallelism = options.enableParallelism; + this._allowOversubscription = options.allowOversubscription; this._isIncrementalBuildAllowed = options.incremental; this._disableBuildCache = options.disableBuildCache; this._originalPhases = options.originalPhases; @@ -583,6 +586,7 @@ export class PhasedScriptAction extends BaseScriptAction i quietMode: isQuietMode, debugMode: this.parser.isDebug, parallelism, + allowOversubscription: this._allowOversubscription, beforeExecuteOperationAsync: async (record: OperationExecutionRecord) => { return await this.hooks.beforeExecuteOperation.promise(record); }, diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index b88211eabb7..b0f2a3e0cb2 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -25,6 +25,7 @@ export interface IOperationExecutionManagerOptions { quietMode: boolean; debugMode: boolean; parallelism: number; + allowOversubscription: boolean; inputsSnapshot?: IInputsSnapshot; destination?: TerminalWritable; @@ -69,6 +70,7 @@ export class OperationExecutionManager { private readonly _executionRecords: Map; private readonly _quietMode: boolean; private readonly _parallelism: number; + private readonly _allowOversubscription: boolean; private readonly _totalOperations: number; private readonly _outputWritable: TerminalWritable; @@ -99,6 +101,7 @@ export class OperationExecutionManager { quietMode, debugMode, parallelism, + allowOversubscription, inputsSnapshot, beforeExecuteOperationAsync: beforeExecuteOperation, afterExecuteOperationAsync: afterExecuteOperation, @@ -112,6 +115,7 @@ export class OperationExecutionManager { this._hasAnyNonAllowedWarnings = false; this._hasAnyAborted = false; this._parallelism = parallelism; + this._allowOversubscription = allowOversubscription; this._beforeExecuteOperation = beforeExecuteOperation; this._afterExecuteOperation = afterExecuteOperation; @@ -304,6 +308,7 @@ export class OperationExecutionManager { } }, { + allowOversubscription: this._allowOversubscription, concurrency: maxParallelism, weighted: true } diff --git a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts index d86f2bc7f7b..b8168a8e232 100644 --- a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts @@ -103,6 +103,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); @@ -185,6 +186,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable } ); @@ -229,6 +231,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable } ); @@ -250,6 +253,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); @@ -287,6 +291,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); diff --git a/libraries/rush-lib/src/schemas/command-line.schema.json b/libraries/rush-lib/src/schemas/command-line.schema.json index dd2655f30b2..0091e7bb7ae 100644 --- a/libraries/rush-lib/src/schemas/command-line.schema.json +++ b/libraries/rush-lib/src/schemas/command-line.schema.json @@ -66,6 +66,11 @@ "description": "If true then this command can be run in parallel, i.e. executed simultaneously for multiple projects.", "type": "boolean" }, + "allowOversubscription": { + "title": "allowOversubscription", + "type": "boolean", + "description": "Controls whether weighted operations can start when the total weight would exceed the limit but is currently below the limit. This setting only applies when \"enableParallelism\" is true and operations have a \"weight\" property configured in their rush-project.json \"operationSettings\". Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit." + }, "ignoreDependencyOrder": { "title": "ignoreDependencyOrder", "description": "Normally projects will be processed according to their dependency order: a given project will not start processing the command until all of its dependencies have completed. This restriction doesn't apply for certain operations, for example, a \"clean\" task that deletes output files. In this case you can set \"ignoreDependencyOrder\" to true to increase parallelism.", @@ -110,6 +115,7 @@ "shellCommand": { "$ref": "#/definitions/anything" }, "enableParallelism": { "$ref": "#/definitions/anything" }, + "allowOversubscription": { "$ref": "#/definitions/anything" }, "ignoreDependencyOrder": { "$ref": "#/definitions/anything" }, "ignoreMissingScript": { "$ref": "#/definitions/anything" }, "incremental": { "$ref": "#/definitions/anything" }, @@ -181,6 +187,11 @@ "description": "If true then this command can be run in parallel, i.e. executed simultaneously for multiple projects.", "type": "boolean" }, + "allowOversubscription": { + "title": "allowOversubscription", + "type": "boolean", + "description": "Controls whether weighted operations can start when the total weight would exceed the limit but is currently below the limit. This setting only applies when \"enableParallelism\" is true and operations have a \"weight\" property configured in their rush-project.json \"operationSettings\". Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit." + }, "incremental": { "title": "Incremental", "description": "If true then this command's phases will be incremental and support caching.", @@ -253,6 +264,7 @@ "safeForSimultaneousRushProcesses": { "$ref": "#/definitions/anything" }, "enableParallelism": { "$ref": "#/definitions/anything" }, + "allowOversubscription": { "$ref": "#/definitions/anything" }, "incremental": { "$ref": "#/definitions/anything" }, "phases": { "$ref": "#/definitions/anything" }, "watchOptions": { "$ref": "#/definitions/anything" },