From 3e4fc31e2fa8f40ccad97541da96816fe850d42a Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Wed, 10 Sep 2025 18:13:46 -0400 Subject: [PATCH 01/15] Fix concurrency bug for max weighted operation scheduling --- libraries/node-core-library/src/Async.ts | 21 +++- .../node-core-library/src/test/Async.test.ts | 109 ++++++++++++++++-- 2 files changed, 119 insertions(+), 11 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 85d0b0fcdb0..39940c7a806 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -201,6 +201,7 @@ export class Async { let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; + let nextIterator: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -213,7 +214,7 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = await iterator.next(); + const currentIteratorResult: IteratorResult = nextIterator || (await iterator.next()); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; @@ -225,9 +226,25 @@ export class Async { // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. - concurrentUnitsInProgress += weight; concurrentUnitsInProgress -= limitedConcurrency; + // Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` + const weightWithPeekedIsOverConcurrency: boolean = + concurrentUnitsInProgress + weight > concurrency; + const currentUnitsIsZero: boolean = concurrentUnitsInProgress === 0; + const taskWeightIsZero: boolean = weight === 0; + if (weightWithPeekedIsOverConcurrency && !currentUnitsIsZero && !taskWeightIsZero) { + // eslint-disable-next-line require-atomic-updates + nextIterator = currentIteratorResult; + break; + } else { + // clear iterator + // eslint-disable-next-line require-atomic-updates + nextIterator = undefined; + } + + concurrentUnitsInProgress += weight; + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { // Remove the operation completely from the in progress units. diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 5442723e8b2..64c32fb8097 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -27,13 +27,6 @@ describe(Async.name, () => { expect(fn).toHaveBeenNthCalledWith(3, 3, 2); }); - it('returns the same result as built-in Promise.all', async () => { - const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8]; - const fn: (item: number) => Promise = async (item) => `result ${item}`; - - expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn))); - }); - it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { let running: number = 0; let maxRunning: number = 0; @@ -447,7 +440,7 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); - it('waits for a large operation to finish before scheduling more', async () => { + it.only('waits for a small and large operation to finish before scheduling more', async () => { let running: number = 0; let maxRunning: number = 0; @@ -471,7 +464,7 @@ describe(Async.name, () => { await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); - expect(maxRunning).toEqual(2); + expect(maxRunning).toEqual(1); }); it('allows operations with a weight of 0 and schedules them accordingly', async () => { @@ -496,6 +489,104 @@ describe(Async.name, () => { expect(maxRunning).toEqual(9); }); + it('ensures isolated job runs in isolation while small jobs never run alongside it', async () => { + const maxConcurrency: number = 10; + let running: number = 0; + const jobToMaxConcurrentJobsRunning: Record = {}; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 1 }, + { n: 3, weight: 1 }, + { n: 4, weight: maxConcurrency }, + { n: 5, weight: 1 }, + { n: 6, weight: 1 }, + { n: 7, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); + + // Simulate longer running time for heavyweight job + if (item.weight === maxConcurrency) { + await Async.sleepAsync(50); + } else { + await Async.sleepAsync(10); + } + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: maxConcurrency, weighted: true }); + + expect(fn).toHaveBeenCalledTimes(7); + + // The heavyweight job (n=4) should run with only 1 concurrent job (itself) + expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1); + + // Small jobs should be able to run concurrently with each other but not with heavyweight job + const nonIsolatedJobs = array.filter((job) => job.weight !== maxConcurrency); + nonIsolatedJobs.forEach((job) => { + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1); + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together + }); + }); + + it('allows zero weight tasks to run alongside weight = concurrency task', async () => { + const concurrency = 3; + const array: INumberWithWeight[] = [ + { n: 1, weight: 0 }, + { n: 2, weight: concurrency }, + { n: 3, weight: 0 } + ]; + + let running: number = 0; + const jobToMaxConcurrentJobsRunning: Record = {}; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); + + await Async.sleepAsync(0); + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: concurrency, weighted: true }); + + expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight + expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 3 weight + expect(jobToMaxConcurrentJobsRunning[3]).toEqual(2); // runs 0 weight + 3 weight + }); + + it('allows zero weight tasks to run alongside weight > concurrency task', async () => { + const concurrency = 3; + const array: INumberWithWeight[] = [ + { n: 1, weight: 0 }, + { n: 2, weight: concurrency + 1 }, + { n: 3, weight: 0 } + ]; + + let running: number = 0; + const jobToMaxConcurrentJobsRunning: Record = {}; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); + + await Async.sleepAsync(0); + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true }); + + expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight + expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 4 weight + expect(jobToMaxConcurrentJobsRunning[3]).toEqual(2); // runs 0 weight + 3 weight + }); + it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { let waitingIterators: number = 0; From 534076ed8f5af04ce6ecf5b6a1f9f2d088dea0bf Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Wed, 10 Sep 2025 19:12:43 -0400 Subject: [PATCH 02/15] create Peekable iterator --- libraries/node-core-library/src/Async.ts | 60 ++++++++++++++++-------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 39940c7a806..c46e6343bae 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -70,6 +70,31 @@ export interface IRunWithTimeoutOptions { timeoutMessage?: string; } +class PeekableIterator { + private _peekedResult: IteratorResult | undefined = undefined; + private readonly _iterator: Iterator | AsyncIterator; + + public constructor(iterator: Iterator | AsyncIterator) { + this._iterator = iterator; + } + + public async peekAsync(): Promise> { + if (this._peekedResult === undefined) { + this._peekedResult = await this._iterator.next(); + } + return this._peekedResult; + } + + public async nextAsync(): Promise> { + if (this._peekedResult !== undefined) { + const result: IteratorResult = this._peekedResult; + this._peekedResult = undefined; + return result; + } + return await this._iterator.next(); + } +} + /** * @remarks * Used with {@link (Async:class).(forEachAsync:2)} and {@link (Async:class).(mapAsync:2)}. @@ -194,14 +219,14 @@ export class Async { options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let concurrentUnitsInProgress: number = 0; - const iterator: Iterator | AsyncIterator = (iterable as AsyncIterable)[ + const baseIterator: Iterator | AsyncIterator = (iterable as AsyncIterable)[ Symbol.asyncIterator ].call(iterable); + const iterator: PeekableIterator = new PeekableIterator(baseIterator); let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; - let nextIterator: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -214,34 +239,31 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = nextIterator || (await iterator.next()); + + // Peek at the next item to check its weight before committing to process it + const peekedIteratorResult: IteratorResult = await iterator.peekAsync(); // eslint-disable-next-line require-atomic-updates - iteratorIsComplete = !!currentIteratorResult.done; + iteratorIsComplete = !!peekedIteratorResult.done; if (!iteratorIsComplete) { - const currentIteratorValue: TEntry = currentIteratorResult.value; - Async.validateWeightedIterable(currentIteratorValue); + const peekedIteratorValue: TEntry = peekedIteratorResult.value; + Async.validateWeightedIterable(peekedIteratorValue); // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. - const weight: number = Math.min(currentIteratorValue.weight, concurrency); + const weight: number = Math.min(peekedIteratorValue.weight, concurrency); // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. concurrentUnitsInProgress -= limitedConcurrency; - // Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` - const weightWithPeekedIsOverConcurrency: boolean = - concurrentUnitsInProgress + weight > concurrency; - const currentUnitsIsZero: boolean = concurrentUnitsInProgress === 0; - const taskWeightIsZero: boolean = weight === 0; - if (weightWithPeekedIsOverConcurrency && !currentUnitsIsZero && !taskWeightIsZero) { - // eslint-disable-next-line require-atomic-updates - nextIterator = currentIteratorResult; + // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` + const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; + const hasRunningTasks: boolean = concurrentUnitsInProgress > 0; + const isWeightedTask: boolean = weight > 0; + if (wouldExceedConcurrency && hasRunningTasks && isWeightedTask) { break; - } else { - // clear iterator - // eslint-disable-next-line require-atomic-updates - nextIterator = undefined; } + const currentIteratorResult: IteratorResult = await iterator.nextAsync(); + const currentIteratorValue: TEntry = currentIteratorResult.value; concurrentUnitsInProgress += weight; From ba1c69406964cf477ab06d2d7fab964f76c9d49d Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Thu, 11 Sep 2025 11:24:45 -0400 Subject: [PATCH 03/15] changelog --- .../eb-concurrency-bug-fix_2025-09-11-15-24.json | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json new file mode 100644 index 00000000000..837b44a1abe --- /dev/null +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "fix weighted concurrency scheduling to prevent oversubscription", + "type": "patch" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file From c50ea4f56bd1fd1132caff7eebc0281898986793 Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Thu, 11 Sep 2025 18:40:55 -0400 Subject: [PATCH 04/15] go back to singleton next iterator --- libraries/node-core-library/src/Async.ts | 49 +++++-------------- .../node-core-library/src/test/Async.test.ts | 6 +-- 2 files changed, 16 insertions(+), 39 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index c46e6343bae..30315dcdc69 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -70,31 +70,6 @@ export interface IRunWithTimeoutOptions { timeoutMessage?: string; } -class PeekableIterator { - private _peekedResult: IteratorResult | undefined = undefined; - private readonly _iterator: Iterator | AsyncIterator; - - public constructor(iterator: Iterator | AsyncIterator) { - this._iterator = iterator; - } - - public async peekAsync(): Promise> { - if (this._peekedResult === undefined) { - this._peekedResult = await this._iterator.next(); - } - return this._peekedResult; - } - - public async nextAsync(): Promise> { - if (this._peekedResult !== undefined) { - const result: IteratorResult = this._peekedResult; - this._peekedResult = undefined; - return result; - } - return await this._iterator.next(); - } -} - /** * @remarks * Used with {@link (Async:class).(forEachAsync:2)} and {@link (Async:class).(mapAsync:2)}. @@ -219,14 +194,15 @@ export class Async { options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; let concurrentUnitsInProgress: number = 0; - const baseIterator: Iterator | AsyncIterator = (iterable as AsyncIterable)[ + const iterator: Iterator | AsyncIterator = (iterable as AsyncIterable)[ Symbol.asyncIterator ].call(iterable); - const iterator: PeekableIterator = new PeekableIterator(baseIterator); let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; + // iterator that is stored when the loop exits early due to not enough concurrency + let nextIterator: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -239,17 +215,15 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - - // Peek at the next item to check its weight before committing to process it - const peekedIteratorResult: IteratorResult = await iterator.peekAsync(); + const currentIteratorResult: IteratorResult = nextIterator || (await iterator.next()); // eslint-disable-next-line require-atomic-updates - iteratorIsComplete = !!peekedIteratorResult.done; + iteratorIsComplete = !!currentIteratorResult.done; if (!iteratorIsComplete) { - const peekedIteratorValue: TEntry = peekedIteratorResult.value; - Async.validateWeightedIterable(peekedIteratorValue); + const currentIteratorValue: TEntry = currentIteratorResult.value; + Async.validateWeightedIterable(currentIteratorValue); // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. - const weight: number = Math.min(peekedIteratorValue.weight, concurrency); + const weight: number = Math.min(currentIteratorValue.weight, concurrency); // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. @@ -260,10 +234,13 @@ export class Async { const hasRunningTasks: boolean = concurrentUnitsInProgress > 0; const isWeightedTask: boolean = weight > 0; if (wouldExceedConcurrency && hasRunningTasks && isWeightedTask) { + // eslint-disable-next-line require-atomic-updates + nextIterator = currentIteratorResult; break; + } else { + // eslint-disable-next-line require-atomic-updates + nextIterator = undefined; } - const currentIteratorResult: IteratorResult = await iterator.nextAsync(); - const currentIteratorValue: TEntry = currentIteratorResult.value; concurrentUnitsInProgress += weight; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 64c32fb8097..932d1b20d61 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -440,7 +440,7 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); - it.only('waits for a small and large operation to finish before scheduling more', async () => { + it('waits for a small and large operation to finish before scheduling more', async () => { let running: number = 0; let maxRunning: number = 0; @@ -557,7 +557,7 @@ describe(Async.name, () => { expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 3 weight - expect(jobToMaxConcurrentJobsRunning[3]).toEqual(2); // runs 0 weight + 3 weight + expect(jobToMaxConcurrentJobsRunning[3]).toEqual(1); // runs 0 weight after 3 weight completes }); it('allows zero weight tasks to run alongside weight > concurrency task', async () => { @@ -584,7 +584,7 @@ describe(Async.name, () => { expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 4 weight - expect(jobToMaxConcurrentJobsRunning[3]).toEqual(2); // runs 0 weight + 3 weight + expect(jobToMaxConcurrentJobsRunning[3]).toEqual(1); // runs 0 weight after 3 weight completes }); it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { From 5a840652f97784ca8d7e1687617faab01c3235ba Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Fri, 12 Sep 2025 18:55:41 -0400 Subject: [PATCH 05/15] reviews --- .../eb-concurrency-bug-fix_2025-09-11-15-24.json | 2 +- libraries/node-core-library/src/Async.ts | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json index 837b44a1abe..0db1a802bc1 100644 --- a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "fix weighted concurrency scheduling to prevent oversubscription", + "comment": "Fix Async.forEachAsync weighted concurrency scheduling to prevent oversubscription", "type": "patch" } ], diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 30315dcdc69..0c32df1c029 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -232,16 +232,14 @@ export class Async { // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; const hasRunningTasks: boolean = concurrentUnitsInProgress > 0; - const isWeightedTask: boolean = weight > 0; - if (wouldExceedConcurrency && hasRunningTasks && isWeightedTask) { + if (wouldExceedConcurrency && hasRunningTasks) { // eslint-disable-next-line require-atomic-updates nextIterator = currentIteratorResult; break; - } else { - // eslint-disable-next-line require-atomic-updates - nextIterator = undefined; } + // eslint-disable-next-line require-atomic-updates + nextIterator = undefined; concurrentUnitsInProgress += weight; Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) From 7688a325b53c79a667c7db42d2da76a668aa0c7f Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Tue, 16 Sep 2025 15:06:41 -0400 Subject: [PATCH 06/15] reviews --- libraries/node-core-library/src/Async.ts | 31 +++- .../node-core-library/src/test/Async.test.ts | 168 +++++++----------- libraries/operation-graph/src/Operation.ts | 19 +- .../src/api/RushProjectConfiguration.ts | 7 + .../src/logic/operations/Operation.ts | 7 + .../operations/WeightedOperationPlugin.ts | 5 +- .../src/schemas/rush-project.schema.json | 4 + 7 files changed, 127 insertions(+), 114 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 0c32df1c029..9982103a302 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -14,7 +14,8 @@ export interface IAsyncParallelismOptions { /** * Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum - * number of concurrent promises to the specified number. + * number of concurrent promises to the specified number. Individual operations may exceed this + * limit based on their `allowOversubscription` property. */ concurrency?: number; @@ -82,12 +83,25 @@ export interface IWeighted { * Must be a whole number greater than or equal to 0. */ weight: number; + + /** + * Controls whether this operation can start even if doing so would exceed the total concurrency limit. + * If true (default), will start the operation even when it would exceed the limit. + * If false, waits until sufficient capacity is available. + */ + allowOversubscription?: boolean; +} + +interface IWeightedWrapper { + allowOversubscription?: boolean; + element: TElement; + weight: number; } function toWeightedIterator( iterable: Iterable | AsyncIterable, useWeights?: boolean -): AsyncIterable<{ element: TEntry; weight: number }> { +): AsyncIterable> { const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || (iterable as AsyncIterable)[Symbol.asyncIterator] @@ -99,7 +113,11 @@ function toWeightedIterator( // The await is necessary here, but TS will complain - it's a false positive. const { value, done } = await iterator.next(); return { - value: { element: value, weight: useWeights ? value?.weight : 1 }, + value: { + allowOversubscription: value?.allowOversubscription ?? true, + element: value, + weight: useWeights ? value?.weight : 1 + }, done: !!done }; } @@ -184,7 +202,7 @@ export class Async { return result; } - private static async _forEachWeightedAsync( + private static async _forEachWeightedAsync>( iterable: AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined @@ -231,8 +249,8 @@ export class Async { // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; - const hasRunningTasks: boolean = concurrentUnitsInProgress > 0; - if (wouldExceedConcurrency && hasRunningTasks) { + const allowOversubscription: boolean = currentIteratorValue.allowOversubscription ?? true; + if (!allowOversubscription && wouldExceedConcurrency) { // eslint-disable-next-line require-atomic-updates nextIterator = currentIteratorResult; break; @@ -320,6 +338,7 @@ export class Async { * number of concurrency units that can be in progress at once. The weight of each operation * determines how many concurrency units it takes up. For example, if the concurrency is 2 * and the first operation has a weight of 2, then only one more operation can be in progress. + * Operations may exceed the concurrency limit based on their `allowOversubscription` property. * * If `callback` throws a synchronous exception, or if it returns a promise that rejects, * then the loop stops immediately. Any remaining array items will be skipped, and diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 932d1b20d61..a1ece4786c1 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -3,6 +3,12 @@ import { Async, AsyncQueue } from '../Async'; +interface INumberWithWeight { + allowOversubscription?: boolean; + n: number; + weight: number; +} + describe(Async.name, () => { describe(Async.mapAsync.name, () => { it('handles an empty array correctly', async () => { @@ -307,11 +313,6 @@ describe(Async.name, () => { ).rejects.toThrow(expectedError); }); - interface INumberWithWeight { - n: number; - weight: number; - } - it('handles an empty array correctly', async () => { let running: number = 0; let maxRunning: number = 0; @@ -440,7 +441,7 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); - it('waits for a small and large operation to finish before scheduling more', async () => { + it('waits for a large operation to finish before scheduling more', async () => { let running: number = 0; let maxRunning: number = 0; @@ -464,7 +465,7 @@ describe(Async.name, () => { await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); - expect(maxRunning).toEqual(1); + expect(maxRunning).toEqual(2); }); it('allows operations with a weight of 0 and schedules them accordingly', async () => { @@ -489,104 +490,6 @@ describe(Async.name, () => { expect(maxRunning).toEqual(9); }); - it('ensures isolated job runs in isolation while small jobs never run alongside it', async () => { - const maxConcurrency: number = 10; - let running: number = 0; - const jobToMaxConcurrentJobsRunning: Record = {}; - - const array: INumberWithWeight[] = [ - { n: 1, weight: 1 }, - { n: 2, weight: 1 }, - { n: 3, weight: 1 }, - { n: 4, weight: maxConcurrency }, - { n: 5, weight: 1 }, - { n: 6, weight: 1 }, - { n: 7, weight: 1 } - ]; - - const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { - running++; - jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); - - // Simulate longer running time for heavyweight job - if (item.weight === maxConcurrency) { - await Async.sleepAsync(50); - } else { - await Async.sleepAsync(10); - } - - running--; - }); - - await Async.forEachAsync(array, fn, { concurrency: maxConcurrency, weighted: true }); - - expect(fn).toHaveBeenCalledTimes(7); - - // The heavyweight job (n=4) should run with only 1 concurrent job (itself) - expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1); - - // Small jobs should be able to run concurrently with each other but not with heavyweight job - const nonIsolatedJobs = array.filter((job) => job.weight !== maxConcurrency); - nonIsolatedJobs.forEach((job) => { - expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1); - expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together - }); - }); - - it('allows zero weight tasks to run alongside weight = concurrency task', async () => { - const concurrency = 3; - const array: INumberWithWeight[] = [ - { n: 1, weight: 0 }, - { n: 2, weight: concurrency }, - { n: 3, weight: 0 } - ]; - - let running: number = 0; - const jobToMaxConcurrentJobsRunning: Record = {}; - - const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { - running++; - jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); - - await Async.sleepAsync(0); - - running--; - }); - - await Async.forEachAsync(array, fn, { concurrency: concurrency, weighted: true }); - - expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight - expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 3 weight - expect(jobToMaxConcurrentJobsRunning[3]).toEqual(1); // runs 0 weight after 3 weight completes - }); - - it('allows zero weight tasks to run alongside weight > concurrency task', async () => { - const concurrency = 3; - const array: INumberWithWeight[] = [ - { n: 1, weight: 0 }, - { n: 2, weight: concurrency + 1 }, - { n: 3, weight: 0 } - ]; - - let running: number = 0; - const jobToMaxConcurrentJobsRunning: Record = {}; - - const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { - running++; - jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); - - await Async.sleepAsync(0); - - running--; - }); - - await Async.forEachAsync(array, fn, { concurrency, weighted: true }); - - expect(jobToMaxConcurrentJobsRunning[1]).toEqual(1); // runs 0 weight - expect(jobToMaxConcurrentJobsRunning[2]).toEqual(2); // runs 0 weight + 4 weight - expect(jobToMaxConcurrentJobsRunning[3]).toEqual(1); // runs 0 weight after 3 weight completes - }); - it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { let waitingIterators: number = 0; @@ -779,6 +682,61 @@ describe(Async.name, () => { expect(sleepSpy).toHaveBeenCalledTimes(1); expect(sleepSpy).toHaveBeenLastCalledWith(5); }); + + describe('allowOversubscription=false operations', () => { + it('waits for a small and large operation to finish before scheduling more', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1, allowOversubscription: false }, + { n: 2, weight: 10, allowOversubscription: false }, + { n: 3, weight: 1, allowOversubscription: false }, + { n: 4, weight: 10, allowOversubscription: false }, + { n: 5, weight: 1, allowOversubscription: false }, + { n: 6, weight: 10, allowOversubscription: false }, + { n: 7, weight: 1, allowOversubscription: false }, + { n: 8, weight: 10, allowOversubscription: false } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(1); + }); + + it('handles operations where some have undefined and others have explicit values', async () => { + const concurrency = 3; + let running: number = 0; + let maxRunning: number = 0; + const array: INumberWithWeight[] = [ + { n: 1, weight: 3 }, // undefined allowOversubscription (should default to true) + { n: 2, weight: 3, allowOversubscription: false }, + { n: 3, weight: 1 }, // undefined allowOversubscription (should default to true) + { n: 4, weight: 1, allowOversubscription: true } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + maxRunning = Math.max(maxRunning, running); + + await Async.sleepAsync(0); + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true }); + + expect(fn).toHaveBeenCalledTimes(4); + expect(maxRunning).toBeLessThanOrEqual(4); // Respects weight and concurrency limits + }); + }); }); }); diff --git a/libraries/operation-graph/src/Operation.ts b/libraries/operation-graph/src/Operation.ts index 75528ce1295..527c8aaaf04 100644 --- a/libraries/operation-graph/src/Operation.ts +++ b/libraries/operation-graph/src/Operation.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. -import { InternalError } from '@rushstack/node-core-library'; +import { InternalError, type IWeighted } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { Stopwatch } from './Stopwatch'; @@ -41,6 +41,13 @@ export interface IOperationOptions - implements IOperationStates + implements IOperationStates, IWeighted { /** * A set of all dependencies which must be executed before this operation is complete. @@ -174,6 +181,13 @@ export class Operation Date: Tue, 16 Sep 2025 15:06:52 -0400 Subject: [PATCH 07/15] update changelogs --- .../rush/eb-concurrency-bug-fix_2025-09-16-19-06.json | 10 ++++++++++ .../eb-concurrency-bug-fix_2025-09-11-15-24.json | 4 ++-- .../eb-concurrency-bug-fix_2025-09-16-19-06.json | 10 ++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json create mode 100644 common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json new file mode 100644 index 00000000000..d0bf85feb5f --- /dev/null +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "add allowOversubscription option to prevent running tasks from exceeding concurrency", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json index 0db1a802bc1..9b543642f9e 100644 --- a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -2,8 +2,8 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Fix Async.forEachAsync weighted concurrency scheduling to prevent oversubscription", - "type": "patch" + "comment": "Add allowOversubscription option to prevent running tasks from exceeding concurrency", + "type": "minor" } ], "packageName": "@rushstack/node-core-library" diff --git a/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json new file mode 100644 index 00000000000..809d1e7764e --- /dev/null +++ b/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/operation-graph", + "comment": "add allowOversubscription option to prevent running tasks from exceeding concurrency", + "type": "minor" + } + ], + "packageName": "@rushstack/operation-graph" +} \ No newline at end of file From 381b71864c5b6898bee028020bdc4c76934e183b Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Tue, 16 Sep 2025 15:13:20 -0400 Subject: [PATCH 08/15] api reviews --- common/reviews/api/node-core-library.api.md | 1 + common/reviews/api/operation-graph.api.md | 5 ++++- common/reviews/api/rush-lib.api.md | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index a0fc83dccdc..76160b35239 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -669,6 +669,7 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { // @public (undocumented) export interface IWeighted { + allowOversubscription?: boolean; weight: number; } diff --git a/common/reviews/api/operation-graph.api.md b/common/reviews/api/operation-graph.api.md index eac2ff25c29..7de4ee103ac 100644 --- a/common/reviews/api/operation-graph.api.md +++ b/common/reviews/api/operation-graph.api.md @@ -7,6 +7,7 @@ /// import type { ITerminal } from '@rushstack/terminal'; +import { IWeighted } from '@rushstack/node-core-library'; // @beta export type CommandMessageFromHost = ICancelCommandMessage | IExitCommandMessage | IRunCommandMessage | ISyncCommandMessage; @@ -65,6 +66,7 @@ export interface IOperationExecutionOptions { + allowOversubscription?: boolean | undefined; group?: OperationGroupRecord | undefined; metadata?: TMetadata | undefined; name: string; @@ -148,10 +150,11 @@ export interface IWatchLoopState { } // @beta -export class Operation implements IOperationStates { +export class Operation implements IOperationStates, IWeighted { constructor(options: IOperationOptions); // (undocumented) addDependency(dependency: Operation): void; + allowOversubscription: boolean; readonly consumers: Set>; criticalPathLength: number | undefined; // (undocumented) diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index 50b24af3f22..3562b51724c 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -690,6 +690,7 @@ export interface IOperationRunnerContext { // @alpha (undocumented) export interface IOperationSettings { allowCobuildWithoutCache?: boolean; + allowOversubscription?: boolean; dependsOnAdditionalFiles?: string[]; dependsOnEnvVars?: string[]; disableBuildCacheForOperation?: boolean; @@ -987,6 +988,7 @@ export class NpmOptionsConfiguration extends PackageManagerOptionsConfigurationB export class Operation { constructor(options: IOperationOptions); addDependency(dependency: Operation): void; + allowOversubscription: boolean; readonly associatedPhase: IPhase; readonly associatedProject: RushConfigurationProject; readonly consumers: ReadonlySet; From 18960cdf75a724161bd4da917ef7fc099b7cad83 Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Tue, 16 Sep 2025 18:11:50 -0400 Subject: [PATCH 09/15] test cleanup before review --- ...-concurrency-bug-fix_2025-09-16-19-06.json | 2 +- libraries/node-core-library/src/Async.ts | 2 +- .../node-core-library/src/test/Async.test.ts | 130 ++++++++++++++++-- libraries/operation-graph/src/Operation.ts | 2 +- .../src/api/RushProjectConfiguration.ts | 2 +- .../src/logic/operations/Operation.ts | 2 +- 6 files changed, 123 insertions(+), 17 deletions(-) diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json index d0bf85feb5f..48dae0db27c 100644 --- a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -3,7 +3,7 @@ { "packageName": "@microsoft/rush", "comment": "add allowOversubscription option to prevent running tasks from exceeding concurrency", - "type": "none" + "type": "minor" } ], "packageName": "@microsoft/rush" diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 9982103a302..7c9c0c331bd 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -85,7 +85,7 @@ export interface IWeighted { weight: number; /** - * Controls whether this operation can start even if doing so would exceed the total concurrency limit. + * Controls whether this operation can start, even if doing so would exceed the total concurrency limit.. * If true (default), will start the operation even when it would exceed the limit. * If false, waits until sufficient capacity is available. */ diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index a1ece4786c1..c06df289b87 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -60,6 +60,31 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); + it('respects concurrency limit with allowOversubscription=false in mapAsync', async () => { + const array: INumberWithWeight[] = [ + { n: 1, weight: 2, allowOversubscription: false }, + { n: 2, weight: 2, allowOversubscription: false } + ]; + + let running = 0; + let maxRunning = 0; + + const result = await Async.mapAsync( + array, + async (item) => { + running++; + maxRunning = Math.max(maxRunning, running); + await Async.sleepAsync(0); + running--; + return `result-${item.n}`; + }, + { concurrency: 3, weighted: true } + ); + + expect(result).toEqual(['result-1', 'result-2']); + expect(maxRunning).toEqual(1); + }); + it('rejects if a sync iterator throws an error', async () => { const expectedError: Error = new Error('iterator error'); let iteratorIndex: number = 0; @@ -536,6 +561,10 @@ describe(Async.name, () => { }); describe(Async.runWithRetriesAsync.name, () => { + afterEach(() => { + jest.restoreAllMocks(); + }); + it('Correctly handles a sync function that succeeds the first time', async () => { const expectedResult: string = 'RESULT'; const result: string = await Async.runWithRetriesAsync({ action: () => expectedResult, maxRetries: 0 }); @@ -684,6 +713,76 @@ describe(Async.name, () => { }); describe('allowOversubscription=false operations', () => { + it.each([ + { + concurrency: 4, + weight: 4, + expectedConcurrency: 1, + numberOfTasks: 4 + }, + { + concurrency: 4, + weight: 1, + expectedConcurrency: 4, + numberOfTasks: 4 + }, + { + concurrency: 4, + weight: 5, + expectedConcurrency: 1, + numberOfTasks: 2 + } + ])( + 'enforces strict concurrency limits when allowOversubscription=false: concurrency=$concurrency, weight=$weight, expects max $expectedConcurrency concurrent operations', + async ({ concurrency, weight, expectedConcurrency, numberOfTasks }) => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = Array.from({ length: numberOfTasks }, (v, i) => i).map((n) => ({ + n, + weight, + allowOversubscription: false + })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async () => { + running++; + await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true }); + expect(fn).toHaveBeenCalledTimes(numberOfTasks); + expect(maxRunning).toEqual(expectedConcurrency); + } + ); + + it('handles mixed weights enforcing a strict concurrency limit', async () => { + let running = 0; + let maxRunning = 0; + const startOrder: number[] = []; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1, allowOversubscription: false }, + { n: 2, weight: 3, allowOversubscription: false }, + { n: 3, weight: 1, allowOversubscription: false }, + { n: 4, weight: 2, allowOversubscription: false } + ]; + + const fn = jest.fn(async (item: INumberWithWeight) => { + running++; + startOrder.push(item.n); + maxRunning = Math.max(maxRunning, running); + await Async.sleepAsync(0); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 4, weighted: true }); + + expect(fn).toHaveBeenCalledTimes(4); + expect(maxRunning).toEqual(2); // Max should be limited by weight 3 task + }); + it('waits for a small and large operation to finish before scheduling more', async () => { let running: number = 0; let maxRunning: number = 0; @@ -711,30 +810,37 @@ describe(Async.name, () => { expect(maxRunning).toEqual(1); }); - it('handles operations where some have undefined and others have explicit values', async () => { - const concurrency = 3; + it('handles operations with mixed values of allowOversubscription', async () => { + const concurrency: number = 3; let running: number = 0; let maxRunning: number = 0; + const taskToMaxConcurrency: Record = {}; + const array: INumberWithWeight[] = [ - { n: 1, weight: 3 }, // undefined allowOversubscription (should default to true) - { n: 2, weight: 3, allowOversubscription: false }, - { n: 3, weight: 1 }, // undefined allowOversubscription (should default to true) - { n: 4, weight: 1, allowOversubscription: true } + { n: 1, weight: 1 }, // undefined allowOversubscription (should default to true) + { n: 2, weight: 2 }, // undefined allowOversubscription (should default to true) + { n: 3, weight: concurrency, allowOversubscription: false }, + { n: 4, weight: 1 }, // undefined allowOversubscription (should default to true) + { n: 5, weight: 1, allowOversubscription: true } ]; const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { running++; - maxRunning = Math.max(maxRunning, running); - + taskToMaxConcurrency[item.n] = running; await Async.sleepAsync(0); - + maxRunning = Math.max(maxRunning, running); running--; }); await Async.forEachAsync(array, fn, { concurrency, weighted: true }); - - expect(fn).toHaveBeenCalledTimes(4); - expect(maxRunning).toBeLessThanOrEqual(4); // Respects weight and concurrency limits + expect(fn).toHaveBeenCalledTimes(5); + expect(maxRunning).toEqual(2); + + expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 + 2 + expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2 + expect(taskToMaxConcurrency[3]).toEqual(1); // task 3 (allowOversubscription = false) + expect(taskToMaxConcurrency[4]).toEqual(1); // task 4 + expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5 }); }); }); diff --git a/libraries/operation-graph/src/Operation.ts b/libraries/operation-graph/src/Operation.ts index 527c8aaaf04..f0c1d23833e 100644 --- a/libraries/operation-graph/src/Operation.ts +++ b/libraries/operation-graph/src/Operation.ts @@ -42,7 +42,7 @@ export interface IOperationOptions Date: Wed, 17 Sep 2025 13:42:18 -0400 Subject: [PATCH 10/15] move allowOversubscription to command-line.json --- common/reviews/api/node-core-library.api.md | 2 +- common/reviews/api/rush-lib.api.md | 2 - libraries/node-core-library/src/Async.ts | 23 ++--- .../node-core-library/src/test/Async.test.ts | 97 ++++++++++--------- libraries/rush-lib/src/api/CommandLineJson.ts | 2 + .../src/api/RushProjectConfiguration.ts | 7 -- .../rush-lib/src/cli/RushCommandLineParser.ts | 1 + .../cli/scriptActions/PhasedScriptAction.ts | 4 + .../src/logic/operations/Operation.ts | 7 -- .../operations/OperationExecutionManager.ts | 5 + .../operations/WeightedOperationPlugin.ts | 3 - .../test/OperationExecutionManager.test.ts | 5 + .../src/schemas/command-line.schema.json | 12 +++ .../src/schemas/rush-project.schema.json | 4 - 14 files changed, 90 insertions(+), 84 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 76160b35239..b5d7827f474 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -237,6 +237,7 @@ export type FolderItem = nodeFs.Dirent; // @public export interface IAsyncParallelismOptions { + allowOversubscription?: boolean; concurrency?: number; weighted?: boolean; } @@ -669,7 +670,6 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { // @public (undocumented) export interface IWeighted { - allowOversubscription?: boolean; weight: number; } diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index 3562b51724c..50b24af3f22 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -690,7 +690,6 @@ export interface IOperationRunnerContext { // @alpha (undocumented) export interface IOperationSettings { allowCobuildWithoutCache?: boolean; - allowOversubscription?: boolean; dependsOnAdditionalFiles?: string[]; dependsOnEnvVars?: string[]; disableBuildCacheForOperation?: boolean; @@ -988,7 +987,6 @@ export class NpmOptionsConfiguration extends PackageManagerOptionsConfigurationB export class Operation { constructor(options: IOperationOptions); addDependency(dependency: Operation): void; - allowOversubscription: boolean; readonly associatedPhase: IPhase; readonly associatedProject: RushConfigurationProject; readonly consumers: ReadonlySet; diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 7c9c0c331bd..b057b8fffc6 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -14,8 +14,7 @@ export interface IAsyncParallelismOptions { /** * Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum - * number of concurrent promises to the specified number. Individual operations may exceed this - * limit based on their `allowOversubscription` property. + * number of concurrent promises to the specified number. */ concurrency?: number; @@ -24,6 +23,13 @@ export interface IAsyncParallelismOptions { * take up more or less than one concurrency unit. */ weighted?: boolean; + + /** + * Controls whether operations can start even if doing so would exceed the total concurrency limit. + * If true (default), will start operations even when they would exceed the limit. + * If false, waits until sufficient capacity is available. + */ + allowOversubscription?: boolean; } /** @@ -83,17 +89,9 @@ export interface IWeighted { * Must be a whole number greater than or equal to 0. */ weight: number; - - /** - * Controls whether this operation can start, even if doing so would exceed the total concurrency limit.. - * If true (default), will start the operation even when it would exceed the limit. - * If false, waits until sufficient capacity is available. - */ - allowOversubscription?: boolean; } interface IWeightedWrapper { - allowOversubscription?: boolean; element: TElement; weight: number; } @@ -114,7 +112,6 @@ function toWeightedIterator( const { value, done } = await iterator.next(); return { value: { - allowOversubscription: value?.allowOversubscription ?? true, element: value, weight: useWeights ? value?.weight : 1 }, @@ -249,7 +246,7 @@ export class Async { // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; - const allowOversubscription: boolean = currentIteratorValue.allowOversubscription ?? true; + const allowOversubscription: boolean = options?.allowOversubscription ?? true; if (!allowOversubscription && wouldExceedConcurrency) { // eslint-disable-next-line require-atomic-updates nextIterator = currentIteratorResult; @@ -338,7 +335,7 @@ export class Async { * number of concurrency units that can be in progress at once. The weight of each operation * determines how many concurrency units it takes up. For example, if the concurrency is 2 * and the first operation has a weight of 2, then only one more operation can be in progress. - * Operations may exceed the concurrency limit based on their `allowOversubscription` property. + * Operations may exceed the concurrency limit based on the `allowOversubscription` option. * * If `callback` throws a synchronous exception, or if it returns a promise that rejects, * then the loop stops immediately. Any remaining array items will be skipped, and diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index c06df289b87..76e991c6a97 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -4,7 +4,6 @@ import { Async, AsyncQueue } from '../Async'; interface INumberWithWeight { - allowOversubscription?: boolean; n: number; weight: number; } @@ -62,8 +61,8 @@ describe(Async.name, () => { it('respects concurrency limit with allowOversubscription=false in mapAsync', async () => { const array: INumberWithWeight[] = [ - { n: 1, weight: 2, allowOversubscription: false }, - { n: 2, weight: 2, allowOversubscription: false } + { n: 1, weight: 2 }, + { n: 2, weight: 2 } ]; let running = 0; @@ -78,7 +77,7 @@ describe(Async.name, () => { running--; return `result-${item.n}`; }, - { concurrency: 3, weighted: true } + { concurrency: 3, weighted: true, allowOversubscription: false } ); expect(result).toEqual(['result-1', 'result-2']); @@ -740,8 +739,7 @@ describe(Async.name, () => { const array: INumberWithWeight[] = Array.from({ length: numberOfTasks }, (v, i) => i).map((n) => ({ n, - weight, - allowOversubscription: false + weight })); const fn: (item: INumberWithWeight) => Promise = jest.fn(async () => { @@ -751,96 +749,101 @@ describe(Async.name, () => { running--; }); - await Async.forEachAsync(array, fn, { concurrency, weighted: true }); + await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false }); expect(fn).toHaveBeenCalledTimes(numberOfTasks); expect(maxRunning).toEqual(expectedConcurrency); } ); - it('handles mixed weights enforcing a strict concurrency limit', async () => { - let running = 0; - let maxRunning = 0; - const startOrder: number[] = []; + it('waits for a small and large operation to finish before scheduling more', async () => { + let running: number = 0; + let maxRunning: number = 0; const array: INumberWithWeight[] = [ - { n: 1, weight: 1, allowOversubscription: false }, - { n: 2, weight: 3, allowOversubscription: false }, - { n: 3, weight: 1, allowOversubscription: false }, - { n: 4, weight: 2, allowOversubscription: false } + { n: 1, weight: 1 }, + { n: 2, weight: 10 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 10 }, + { n: 7, weight: 1 }, + { n: 8, weight: 10 } ]; - const fn = jest.fn(async (item: INumberWithWeight) => { + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { running++; - startOrder.push(item.n); - maxRunning = Math.max(maxRunning, running); await Async.sleepAsync(0); + maxRunning = Math.max(maxRunning, running); running--; }); - await Async.forEachAsync(array, fn, { concurrency: 4, weighted: true }); - - expect(fn).toHaveBeenCalledTimes(4); - expect(maxRunning).toEqual(2); // Max should be limited by weight 3 task + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(1); }); - it('waits for a small and large operation to finish before scheduling more', async () => { + it('handles operation with mixed weights', async () => { + const concurrency: number = 3; let running: number = 0; let maxRunning: number = 0; + const taskToMaxConcurrency: Record = {}; const array: INumberWithWeight[] = [ - { n: 1, weight: 1, allowOversubscription: false }, - { n: 2, weight: 10, allowOversubscription: false }, - { n: 3, weight: 1, allowOversubscription: false }, - { n: 4, weight: 10, allowOversubscription: false }, - { n: 5, weight: 1, allowOversubscription: false }, - { n: 6, weight: 10, allowOversubscription: false }, - { n: 7, weight: 1, allowOversubscription: false }, - { n: 8, weight: 10, allowOversubscription: false } + { n: 1, weight: 1 }, + { n: 2, weight: 2 }, + { n: 3, weight: concurrency }, + { n: 4, weight: 1 }, + { n: 5, weight: 1 } ]; const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { running++; + taskToMaxConcurrency[item.n] = running; await Async.sleepAsync(0); maxRunning = Math.max(maxRunning, running); running--; }); - await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); - expect(fn).toHaveBeenCalledTimes(8); - expect(maxRunning).toEqual(1); + await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false }); + expect(fn).toHaveBeenCalledTimes(5); + expect(maxRunning).toEqual(2); + + expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 + expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2 + expect(taskToMaxConcurrency[3]).toEqual(1); // task 3 + expect(taskToMaxConcurrency[4]).toEqual(1); // task 4 + expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5 }); - it('handles operations with mixed values of allowOversubscription', async () => { - const concurrency: number = 3; + it('allows operations with weight 0 to be picked up when system is at max concurrency', async () => { let running: number = 0; let maxRunning: number = 0; const taskToMaxConcurrency: Record = {}; const array: INumberWithWeight[] = [ - { n: 1, weight: 1 }, // undefined allowOversubscription (should default to true) - { n: 2, weight: 2 }, // undefined allowOversubscription (should default to true) - { n: 3, weight: concurrency, allowOversubscription: false }, - { n: 4, weight: 1 }, // undefined allowOversubscription (should default to true) - { n: 5, weight: 1, allowOversubscription: true } + { n: 1, weight: 1 }, + { n: 2, weight: 0 }, + { n: 3, weight: 3 }, + { n: 4, weight: 1 } ]; const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { running++; taskToMaxConcurrency[item.n] = running; - await Async.sleepAsync(0); maxRunning = Math.max(maxRunning, running); + await Async.sleepAsync(0); running--; }); - await Async.forEachAsync(array, fn, { concurrency, weighted: true }); - expect(fn).toHaveBeenCalledTimes(5); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false }); + + expect(fn).toHaveBeenCalledTimes(4); expect(maxRunning).toEqual(2); - expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 + 2 + expect(taskToMaxConcurrency[1]).toEqual(1); // task 1 expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2 - expect(taskToMaxConcurrency[3]).toEqual(1); // task 3 (allowOversubscription = false) + expect(taskToMaxConcurrency[3]).toEqual(2); // task 2 + 3 expect(taskToMaxConcurrency[4]).toEqual(1); // task 4 - expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5 }); }); }); diff --git a/libraries/rush-lib/src/api/CommandLineJson.ts b/libraries/rush-lib/src/api/CommandLineJson.ts index ba4f412176f..e6507e49633 100644 --- a/libraries/rush-lib/src/api/CommandLineJson.ts +++ b/libraries/rush-lib/src/api/CommandLineJson.ts @@ -23,6 +23,7 @@ export interface IBaseCommandJson { export interface IBulkCommandJson extends IBaseCommandJson { commandKind: 'bulk'; enableParallelism: boolean; + allowOversubscription?: boolean; ignoreDependencyOrder?: boolean; ignoreMissingScript?: boolean; incremental?: boolean; @@ -38,6 +39,7 @@ export interface IBulkCommandJson extends IBaseCommandJson { export interface IPhasedCommandWithoutPhasesJson extends IBaseCommandJson { commandKind: 'phased'; enableParallelism: boolean; + allowOversubscription?: boolean; incremental?: boolean; } diff --git a/libraries/rush-lib/src/api/RushProjectConfiguration.ts b/libraries/rush-lib/src/api/RushProjectConfiguration.ts index 669e16e6738..6a1bb5ad3a1 100644 --- a/libraries/rush-lib/src/api/RushProjectConfiguration.ts +++ b/libraries/rush-lib/src/api/RushProjectConfiguration.ts @@ -146,13 +146,6 @@ export interface IOperationSettings { * If true, this operation will never be skipped by the `--changed-projects-only` flag. */ ignoreChangedProjectsOnlyFlag?: boolean; - - /** - * Controls whether this operation can start, even if doing so would exceed the total concurrency limit.. - * If true (default), will start the operation even when it would exceed the limit. - * If false, waits until sufficient capacity is available. - */ - allowOversubscription?: boolean; } interface IOldRushProjectJson { diff --git a/libraries/rush-lib/src/cli/RushCommandLineParser.ts b/libraries/rush-lib/src/cli/RushCommandLineParser.ts index d21b637f931..feb3f052d6f 100644 --- a/libraries/rush-lib/src/cli/RushCommandLineParser.ts +++ b/libraries/rush-lib/src/cli/RushCommandLineParser.ts @@ -470,6 +470,7 @@ export class RushCommandLineParser extends CommandLineParser { enableParallelism: command.enableParallelism, incremental: command.incremental || false, disableBuildCache: command.disableBuildCache || false, + allowOversubscription: command.allowOversubscription ?? true, initialPhases: command.phases, originalPhases: command.originalPhases, diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index 038e2386f31..906a3ca89ea 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -68,6 +68,7 @@ const PERF_PREFIX: 'rush:phasedScriptAction' = 'rush:phasedScriptAction'; */ export interface IPhasedScriptActionOptions extends IBaseScriptActionOptions { enableParallelism: boolean; + allowOversubscription: boolean; incremental: boolean; disableBuildCache: boolean; @@ -140,6 +141,7 @@ export class PhasedScriptAction extends BaseScriptAction i public readonly sessionAbortController: AbortController; private readonly _enableParallelism: boolean; + private readonly _allowOversubscription: boolean; private readonly _isIncrementalBuildAllowed: boolean; private readonly _disableBuildCache: boolean; private readonly _originalPhases: ReadonlySet; @@ -171,6 +173,7 @@ export class PhasedScriptAction extends BaseScriptAction i public constructor(options: IPhasedScriptActionOptions) { super(options); this._enableParallelism = options.enableParallelism; + this._allowOversubscription = options.allowOversubscription; this._isIncrementalBuildAllowed = options.incremental; this._disableBuildCache = options.disableBuildCache; this._originalPhases = options.originalPhases; @@ -583,6 +586,7 @@ export class PhasedScriptAction extends BaseScriptAction i quietMode: isQuietMode, debugMode: this.parser.isDebug, parallelism, + allowOversubscription: this._allowOversubscription, beforeExecuteOperationAsync: async (record: OperationExecutionRecord) => { return await this.hooks.beforeExecuteOperation.promise(record); }, diff --git a/libraries/rush-lib/src/logic/operations/Operation.ts b/libraries/rush-lib/src/logic/operations/Operation.ts index 4f93368da55..be3ec8ac5fc 100644 --- a/libraries/rush-lib/src/logic/operations/Operation.ts +++ b/libraries/rush-lib/src/logic/operations/Operation.ts @@ -107,13 +107,6 @@ export class Operation { */ public enabled: boolean; - /** - * Controls whether this operation can start, even if doing so would exceed the total concurrency limit.. - * If true (default), will start the operation even when it would exceed the limit. - * If false, waits until sufficient capacity is available. - */ - public allowOversubscription: boolean = true; - public constructor(options: IOperationOptions) { const { phase, project, runner, settings, logFilenameIdentifier } = options; this.associatedPhase = phase; diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index b88211eabb7..b0f2a3e0cb2 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -25,6 +25,7 @@ export interface IOperationExecutionManagerOptions { quietMode: boolean; debugMode: boolean; parallelism: number; + allowOversubscription: boolean; inputsSnapshot?: IInputsSnapshot; destination?: TerminalWritable; @@ -69,6 +70,7 @@ export class OperationExecutionManager { private readonly _executionRecords: Map; private readonly _quietMode: boolean; private readonly _parallelism: number; + private readonly _allowOversubscription: boolean; private readonly _totalOperations: number; private readonly _outputWritable: TerminalWritable; @@ -99,6 +101,7 @@ export class OperationExecutionManager { quietMode, debugMode, parallelism, + allowOversubscription, inputsSnapshot, beforeExecuteOperationAsync: beforeExecuteOperation, afterExecuteOperationAsync: afterExecuteOperation, @@ -112,6 +115,7 @@ export class OperationExecutionManager { this._hasAnyNonAllowedWarnings = false; this._hasAnyAborted = false; this._parallelism = parallelism; + this._allowOversubscription = allowOversubscription; this._beforeExecuteOperation = beforeExecuteOperation; this._afterExecuteOperation = afterExecuteOperation; @@ -304,6 +308,7 @@ export class OperationExecutionManager { } }, { + allowOversubscription: this._allowOversubscription, concurrency: maxParallelism, weighted: true } diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index 4738b77169c..89efe7ceb12 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -43,9 +43,6 @@ function weightOperations( if (operationSettings?.weight) { operation.weight = operationSettings.weight; } - if (operationSettings?.allowOversubscription !== undefined) { - operation.allowOversubscription = operationSettings.allowOversubscription; - } } Async.validateWeightedIterable(operation); } diff --git a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts index d86f2bc7f7b..b8168a8e232 100644 --- a/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts @@ -103,6 +103,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); @@ -185,6 +186,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable } ); @@ -229,6 +231,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable } ); @@ -250,6 +253,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); @@ -287,6 +291,7 @@ describe(OperationExecutionManager.name, () => { quietMode: false, debugMode: false, parallelism: 1, + allowOversubscription: true, destination: mockWritable }; }); diff --git a/libraries/rush-lib/src/schemas/command-line.schema.json b/libraries/rush-lib/src/schemas/command-line.schema.json index dd2655f30b2..a51ec961a65 100644 --- a/libraries/rush-lib/src/schemas/command-line.schema.json +++ b/libraries/rush-lib/src/schemas/command-line.schema.json @@ -66,6 +66,11 @@ "description": "If true then this command can be run in parallel, i.e. executed simultaneously for multiple projects.", "type": "boolean" }, + "allowOversubscription": { + "title": "allowOversubscription", + "type": "boolean", + "description": "Controls whether operations can start even if doing so would exceed the total concurrency limit. This setting only applies when 'enableParallelism' is true and operations have a 'weight' property configured in their rush-project.json operationSettings. If true (default), operations will start even when they would exceed the limit. If false, operations wait until sufficient capacity is available." + }, "ignoreDependencyOrder": { "title": "ignoreDependencyOrder", "description": "Normally projects will be processed according to their dependency order: a given project will not start processing the command until all of its dependencies have completed. This restriction doesn't apply for certain operations, for example, a \"clean\" task that deletes output files. In this case you can set \"ignoreDependencyOrder\" to true to increase parallelism.", @@ -110,6 +115,7 @@ "shellCommand": { "$ref": "#/definitions/anything" }, "enableParallelism": { "$ref": "#/definitions/anything" }, + "allowOversubscription": { "$ref": "#/definitions/anything" }, "ignoreDependencyOrder": { "$ref": "#/definitions/anything" }, "ignoreMissingScript": { "$ref": "#/definitions/anything" }, "incremental": { "$ref": "#/definitions/anything" }, @@ -181,6 +187,11 @@ "description": "If true then this command can be run in parallel, i.e. executed simultaneously for multiple projects.", "type": "boolean" }, + "allowOversubscription": { + "title": "allowOversubscription", + "type": "boolean", + "description": "Controls whether operations can start even if doing so would exceed the total concurrency limit. This setting only applies when 'enableParallelism' is true and operations have a 'weight' property configured in their rush-project.json operationSettings. If true (default), operations will start even when they would exceed the limit. If false, operations wait until sufficient capacity is available." + }, "incremental": { "title": "Incremental", "description": "If true then this command's phases will be incremental and support caching.", @@ -253,6 +264,7 @@ "safeForSimultaneousRushProcesses": { "$ref": "#/definitions/anything" }, "enableParallelism": { "$ref": "#/definitions/anything" }, + "allowOversubscription": { "$ref": "#/definitions/anything" }, "incremental": { "$ref": "#/definitions/anything" }, "phases": { "$ref": "#/definitions/anything" }, "watchOptions": { "$ref": "#/definitions/anything" }, diff --git a/libraries/rush-lib/src/schemas/rush-project.schema.json b/libraries/rush-lib/src/schemas/rush-project.schema.json index d0db1b68f51..52883eaa6c1 100644 --- a/libraries/rush-lib/src/schemas/rush-project.schema.json +++ b/libraries/rush-lib/src/schemas/rush-project.schema.json @@ -110,10 +110,6 @@ "ignoreChangedProjectsOnlyFlag": { "type": "boolean", "description": "If true, this operation never be skipped by the `--changed-projects-only` flag. This is useful for projects that bundle code from other packages." - }, - "allowOversubscription": { - "type": "boolean", - "description": "If true, allows this operation to start even if doing so would exceed the maximum concurrency limit determined by the -p flag. If false, waits until sufficient capacity is available. Defaults to true to maintain the original behavior where the concurrency limit could be exceeded." } } } From 5b20bcf512f8e5f7e6aafca9ebf7d9f96592b4ab Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Wed, 17 Sep 2025 14:28:48 -0400 Subject: [PATCH 11/15] remove un-needed changes --- ...-concurrency-bug-fix_2025-09-16-19-06.json | 10 ---------- common/reviews/api/operation-graph.api.md | 5 +---- libraries/node-core-library/src/Async.ts | 14 +++----------- libraries/operation-graph/src/Operation.ts | 19 ++----------------- .../operations/WeightedOperationPlugin.ts | 2 +- 5 files changed, 7 insertions(+), 43 deletions(-) delete mode 100644 common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json diff --git a/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json deleted file mode 100644 index 809d1e7764e..00000000000 --- a/common/changes/@rushstack/operation-graph/eb-concurrency-bug-fix_2025-09-16-19-06.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "changes": [ - { - "packageName": "@rushstack/operation-graph", - "comment": "add allowOversubscription option to prevent running tasks from exceeding concurrency", - "type": "minor" - } - ], - "packageName": "@rushstack/operation-graph" -} \ No newline at end of file diff --git a/common/reviews/api/operation-graph.api.md b/common/reviews/api/operation-graph.api.md index 7de4ee103ac..eac2ff25c29 100644 --- a/common/reviews/api/operation-graph.api.md +++ b/common/reviews/api/operation-graph.api.md @@ -7,7 +7,6 @@ /// import type { ITerminal } from '@rushstack/terminal'; -import { IWeighted } from '@rushstack/node-core-library'; // @beta export type CommandMessageFromHost = ICancelCommandMessage | IExitCommandMessage | IRunCommandMessage | ISyncCommandMessage; @@ -66,7 +65,6 @@ export interface IOperationExecutionOptions { - allowOversubscription?: boolean | undefined; group?: OperationGroupRecord | undefined; metadata?: TMetadata | undefined; name: string; @@ -150,11 +148,10 @@ export interface IWatchLoopState { } // @beta -export class Operation implements IOperationStates, IWeighted { +export class Operation implements IOperationStates { constructor(options: IOperationOptions); // (undocumented) addDependency(dependency: Operation): void; - allowOversubscription: boolean; readonly consumers: Set>; criticalPathLength: number | undefined; // (undocumented) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index b057b8fffc6..5034d62e0d8 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -91,15 +91,10 @@ export interface IWeighted { weight: number; } -interface IWeightedWrapper { - element: TElement; - weight: number; -} - function toWeightedIterator( iterable: Iterable | AsyncIterable, useWeights?: boolean -): AsyncIterable> { +): AsyncIterable<{ element: TEntry; weight: number }> { const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || (iterable as AsyncIterable)[Symbol.asyncIterator] @@ -111,10 +106,7 @@ function toWeightedIterator( // The await is necessary here, but TS will complain - it's a false positive. const { value, done } = await iterator.next(); return { - value: { - element: value, - weight: useWeights ? value?.weight : 1 - }, + value: { element: value, weight: useWeights ? value?.weight : 1 }, done: !!done }; } @@ -199,7 +191,7 @@ export class Async { return result; } - private static async _forEachWeightedAsync>( + private static async _forEachWeightedAsync( iterable: AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined diff --git a/libraries/operation-graph/src/Operation.ts b/libraries/operation-graph/src/Operation.ts index f0c1d23833e..75528ce1295 100644 --- a/libraries/operation-graph/src/Operation.ts +++ b/libraries/operation-graph/src/Operation.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. -import { InternalError, type IWeighted } from '@rushstack/node-core-library'; +import { InternalError } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { Stopwatch } from './Stopwatch'; @@ -41,13 +41,6 @@ export interface IOperationOptions - implements IOperationStates, IWeighted + implements IOperationStates { /** * A set of all dependencies which must be executed before this operation is complete. @@ -181,13 +174,6 @@ export class Operation Date: Thu, 18 Sep 2025 15:00:33 -0400 Subject: [PATCH 12/15] reviews --- .../rush/eb-concurrency-bug-fix_2025-09-16-19-06.json | 4 ++-- .../eb-concurrency-bug-fix_2025-09-11-15-24.json | 2 +- libraries/node-core-library/src/Async.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json index 48dae0db27c..9f3742023b6 100644 --- a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -2,8 +2,8 @@ "changes": [ { "packageName": "@microsoft/rush", - "comment": "add allowOversubscription option to prevent running tasks from exceeding concurrency", - "type": "minor" + "comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.", + "type": "" } ], "packageName": "@microsoft/rush" diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json index 9b543642f9e..2d0115b64f6 100644 --- a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Add allowOversubscription option to prevent running tasks from exceeding concurrency", + "comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency.", "type": "minor" } ], diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 5034d62e0d8..098c5d3eaa6 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -222,7 +222,7 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = nextIterator || (await iterator.next()); + const currentIteratorResult: IteratorResult = nextIterator ?? (await iterator.next()); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; From 9bdaab2a9bf0119bbb57ea07fb2a724b46e90255 Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Thu, 18 Sep 2025 15:04:03 -0400 Subject: [PATCH 13/15] pull bump type from version-policies --- .../rush/eb-concurrency-bug-fix_2025-09-16-19-06.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json index 9f3742023b6..fecd9134afc 100644 --- a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -3,7 +3,7 @@ { "packageName": "@microsoft/rush", "comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.", - "type": "" + "type": "patch" } ], "packageName": "@microsoft/rush" From 44f794f7b7d38c6082ed87e67a180ebd397a3dac Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Fri, 19 Sep 2025 10:40:03 -0400 Subject: [PATCH 14/15] change to none --- .../rush/eb-concurrency-bug-fix_2025-09-16-19-06.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json index fecd9134afc..12fe1d7e9c5 100644 --- a/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json +++ b/common/changes/@microsoft/rush/eb-concurrency-bug-fix_2025-09-16-19-06.json @@ -3,7 +3,7 @@ { "packageName": "@microsoft/rush", "comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.", - "type": "patch" + "type": "none" } ], "packageName": "@microsoft/rush" From 2530f74899e9c0a15e1691ec274c1579843642a3 Mon Sep 17 00:00:00 2001 From: Pete Gonzalez <4673363+octogonz@users.noreply.github.com> Date: Thu, 25 Sep 2025 12:16:09 -0700 Subject: [PATCH 15/15] Change `IAsyncParallelismOptions.allowOversubscription` default to false; improve docs --- ...-concurrency-bug-fix_2025-09-11-15-24.json | 2 +- libraries/node-core-library/src/Async.ts | 23 ++++++++++++++----- .../node-core-library/src/test/Async.test.ts | 2 +- .../common/config/rush/command-line.json | 8 +++++++ .../rush-lib/src/cli/RushCommandLineParser.ts | 4 ++++ .../src/schemas/command-line.schema.json | 4 ++-- 6 files changed, 33 insertions(+), 10 deletions(-) diff --git a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json index 2d0115b64f6..6b24dd9f761 100644 --- a/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json +++ b/common/changes/@rushstack/node-core-library/eb-concurrency-bug-fix_2025-09-11-15-24.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency.", + "comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency. Change its default to `false`.", "type": "minor" } ], diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 098c5d3eaa6..9fb6bb9fd08 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -19,15 +19,26 @@ export interface IAsyncParallelismOptions { concurrency?: number; /** - * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can - * take up more or less than one concurrency unit. + * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an + * operation can take up more or less than one concurrency unit. */ weighted?: boolean; /** - * Controls whether operations can start even if doing so would exceed the total concurrency limit. - * If true (default), will start operations even when they would exceed the limit. - * If false, waits until sufficient capacity is available. + * This option affects the handling of task weights, applying a softer policy that favors maximizing parallelism + * instead of avoiding overload. + * + * @remarks + * By default, a new task cannot start executing if doing so would push the total weight above the concurrency limit. + * Set `allowOversubscription` to true to relax this rule, allowing a new task to start as long as the current + * total weight is below the concurrency limit. Either way, a task cannot start if the total weight already equals + * the concurrency limit; therefore, `allowOversubscription` has no effect when all tasks have weight 1. + * + * Example: Suppose the concurrency limit is 8, and seven tasks are running whose weights are 1, so the current + * total weight is 7. If an available task has weight 2, that would push the total weight to 9, exceeding + * the limit. This task can start only if `allowOversubscription` is true. + * + * @defaultValue false */ allowOversubscription?: boolean; } @@ -238,7 +249,7 @@ export class Async { // Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency; - const allowOversubscription: boolean = options?.allowOversubscription ?? true; + const allowOversubscription: boolean = options?.allowOversubscription ?? false; if (!allowOversubscription && wouldExceedConcurrency) { // eslint-disable-next-line require-atomic-updates nextIterator = currentIteratorResult; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 76e991c6a97..c7c5468bc9d 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -487,7 +487,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(2); }); diff --git a/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json b/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json index 8b972ba7734..3760029c00d 100644 --- a/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json +++ b/libraries/rush-lib/assets/rush-init/common/config/rush/command-line.json @@ -77,6 +77,14 @@ */ "enableParallelism": false, + /** + * Controls whether weighted operations can start when the total weight would exceed the limit + * but is currently below the limit. This setting only applies when "enableParallelism" is true + * and operations have a "weight" property configured in their rush-project.json "operationSettings". + * Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit. + */ + "allowOversubscription": false, + /** * Normally projects will be processed according to their dependency order: a given project will not start * processing the command until all of its dependencies have completed. This restriction doesn't apply for diff --git a/libraries/rush-lib/src/cli/RushCommandLineParser.ts b/libraries/rush-lib/src/cli/RushCommandLineParser.ts index feb3f052d6f..19e6a095f34 100644 --- a/libraries/rush-lib/src/cli/RushCommandLineParser.ts +++ b/libraries/rush-lib/src/cli/RushCommandLineParser.ts @@ -470,6 +470,10 @@ export class RushCommandLineParser extends CommandLineParser { enableParallelism: command.enableParallelism, incremental: command.incremental || false, disableBuildCache: command.disableBuildCache || false, + + // The Async.forEachAsync() API defaults allowOversubscription=false, whereas Rush historically + // defaults allowOversubscription=true to favor faster builds rather than strictly staying below + // the CPU limit. allowOversubscription: command.allowOversubscription ?? true, initialPhases: command.phases, diff --git a/libraries/rush-lib/src/schemas/command-line.schema.json b/libraries/rush-lib/src/schemas/command-line.schema.json index a51ec961a65..0091e7bb7ae 100644 --- a/libraries/rush-lib/src/schemas/command-line.schema.json +++ b/libraries/rush-lib/src/schemas/command-line.schema.json @@ -69,7 +69,7 @@ "allowOversubscription": { "title": "allowOversubscription", "type": "boolean", - "description": "Controls whether operations can start even if doing so would exceed the total concurrency limit. This setting only applies when 'enableParallelism' is true and operations have a 'weight' property configured in their rush-project.json operationSettings. If true (default), operations will start even when they would exceed the limit. If false, operations wait until sufficient capacity is available." + "description": "Controls whether weighted operations can start when the total weight would exceed the limit but is currently below the limit. This setting only applies when \"enableParallelism\" is true and operations have a \"weight\" property configured in their rush-project.json \"operationSettings\". Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit." }, "ignoreDependencyOrder": { "title": "ignoreDependencyOrder", @@ -190,7 +190,7 @@ "allowOversubscription": { "title": "allowOversubscription", "type": "boolean", - "description": "Controls whether operations can start even if doing so would exceed the total concurrency limit. This setting only applies when 'enableParallelism' is true and operations have a 'weight' property configured in their rush-project.json operationSettings. If true (default), operations will start even when they would exceed the limit. If false, operations wait until sufficient capacity is available." + "description": "Controls whether weighted operations can start when the total weight would exceed the limit but is currently below the limit. This setting only applies when \"enableParallelism\" is true and operations have a \"weight\" property configured in their rush-project.json \"operationSettings\". Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit." }, "incremental": { "title": "Incremental",