Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@microsoft/rush",
"comment": "Add an `allowOversubscription` option to the command definitions in `common/config/rush/command-line.json` to prevent running tasks from exceeding concurrency.",
"type": "none"
}
],
"packageName": "@microsoft/rush"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/node-core-library",
"comment": "Add an `allowOversubscription` option to the `Async` API functions which prevents running tasks from exceeding concurrency. Change its default to `false`.",
"type": "minor"
}
],
"packageName": "@rushstack/node-core-library"
}
1 change: 1 addition & 0 deletions common/reviews/api/node-core-library.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ export type FolderItem = nodeFs.Dirent;

// @public
export interface IAsyncParallelismOptions {
allowOversubscription?: boolean;
concurrency?: number;
weighted?: boolean;
}
Expand Down
41 changes: 37 additions & 4 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,28 @@ export interface IAsyncParallelismOptions {
concurrency?: number;

/**
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can
* take up more or less than one concurrency unit.
* Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an
* operation can take up more or less than one concurrency unit.
*/
weighted?: boolean;

/**
* This option affects the handling of task weights, applying a softer policy that favors maximizing parallelism
* instead of avoiding overload.
*
* @remarks
* By default, a new task cannot start executing if doing so would push the total weight above the concurrency limit.
* Set `allowOversubscription` to true to relax this rule, allowing a new task to start as long as the current
* total weight is below the concurrency limit. Either way, a task cannot start if the total weight already equals
* the concurrency limit; therefore, `allowOversubscription` has no effect when all tasks have weight 1.
*
* Example: Suppose the concurrency limit is 8, and seven tasks are running whose weights are 1, so the current
* total weight is 7. If an available task has weight 2, that would push the total weight to 9, exceeding
* the limit. This task can start only if `allowOversubscription` is true.
*
* @defaultValue false
*/
allowOversubscription?: boolean;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Controls whether operations can start even if doing so would exceed the total concurrency limit.
   * If true (default), will start operations even when they would exceed the limit.
   * If false, waits until sufficient capacity is available.
   */
  allowOversubscription?: boolean;

Rush Stack's convention is that optional booleans always default to false.

The allowOversubscription=false behavior seems like a more natural/intuitive operation, so maybe we should make false the default?

Although that's technically a "breaking" change, a bit less parallelism in an edge case is unlikely to break anyone's existing code. In fact, it's arguably a bugfix.

@dmichon-msft

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comments and changed the default to false for Async but not for Rush.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethanburrelldd Do you think we should change the default for Rush as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the default for Async to false makes sense, but I'm worried about changing the default for Rush. It might slow things down for repo maintainers who update their version. Since the previous behavior allowed for oversubscription, keeping true as the default for Rush seems like the right move to avoid breaking things for other maintainers.

}

/**
Expand Down Expand Up @@ -201,6 +219,8 @@ export class Async {
let arrayIndex: number = 0;
let iteratorIsComplete: boolean = false;
let promiseHasResolvedOrRejected: boolean = false;
// iterator that is stored when the loop exits early due to not enough concurrency
let nextIterator: IteratorResult<TEntry> | undefined = undefined;

async function queueOperationsAsync(): Promise<void> {
while (
Expand All @@ -213,7 +233,7 @@ export class Async {
// there will be effectively no cap on the number of operations waiting.
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
concurrentUnitsInProgress += limitedConcurrency;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
const currentIteratorResult: IteratorResult<TEntry> = nextIterator ?? (await iterator.next());
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

Expand All @@ -225,9 +245,21 @@ export class Async {

// Remove the "lock" from the concurrency check and only apply the current weight.
// This should allow other operations to execute.
concurrentUnitsInProgress += weight;
concurrentUnitsInProgress -= limitedConcurrency;

// Wait until there's enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync`
const wouldExceedConcurrency: boolean = concurrentUnitsInProgress + weight > concurrency;
const allowOversubscription: boolean = options?.allowOversubscription ?? false;
if (!allowOversubscription && wouldExceedConcurrency) {
// eslint-disable-next-line require-atomic-updates
nextIterator = currentIteratorResult;
break;
}

// eslint-disable-next-line require-atomic-updates
nextIterator = undefined;
concurrentUnitsInProgress += weight;

Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
.then(async () => {
// Remove the operation completely from the in progress units.
Expand Down Expand Up @@ -306,6 +338,7 @@ export class Async {
* number of concurrency units that can be in progress at once. The weight of each operation
* determines how many concurrency units it takes up. For example, if the concurrency is 2
* and the first operation has a weight of 2, then only one more operation can be in progress.
* Operations may exceed the concurrency limit based on the `allowOversubscription` option.
*
* If `callback` throws a synchronous exception, or if it returns a promise that rejects,
* then the loop stops immediately. Any remaining array items will be skipped, and
Expand Down
184 changes: 171 additions & 13 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

import { Async, AsyncQueue } from '../Async';

interface INumberWithWeight {
n: number;
weight: number;
}

describe(Async.name, () => {
describe(Async.mapAsync.name, () => {
it('handles an empty array correctly', async () => {
Expand All @@ -27,13 +32,6 @@ describe(Async.name, () => {
expect(fn).toHaveBeenNthCalledWith(3, 3, 2);
});

it('returns the same result as built-in Promise.all', async () => {
const array: number[] = [1, 2, 3, 4, 5, 6, 7, 8];
const fn: (item: number) => Promise<string> = async (item) => `result ${item}`;

expect(await Async.mapAsync(array, fn)).toEqual(await Promise.all(array.map(fn)));
});

it('if concurrency is set, ensures no more than N operations occur in parallel', async () => {
let running: number = 0;
let maxRunning: number = 0;
Expand Down Expand Up @@ -61,6 +59,31 @@ describe(Async.name, () => {
expect(maxRunning).toEqual(3);
});

it('respects concurrency limit with allowOversubscription=false in mapAsync', async () => {
const array: INumberWithWeight[] = [
{ n: 1, weight: 2 },
{ n: 2, weight: 2 }
];

let running = 0;
let maxRunning = 0;

const result = await Async.mapAsync(
array,
async (item) => {
running++;
maxRunning = Math.max(maxRunning, running);
await Async.sleepAsync(0);
running--;
return `result-${item.n}`;
},
{ concurrency: 3, weighted: true, allowOversubscription: false }
);

expect(result).toEqual(['result-1', 'result-2']);
expect(maxRunning).toEqual(1);
});

it('rejects if a sync iterator throws an error', async () => {
const expectedError: Error = new Error('iterator error');
let iteratorIndex: number = 0;
Expand Down Expand Up @@ -314,11 +337,6 @@ describe(Async.name, () => {
).rejects.toThrow(expectedError);
});

interface INumberWithWeight {
n: number;
weight: number;
}

it('handles an empty array correctly', async () => {
let running: number = 0;
let maxRunning: number = 0;
Expand Down Expand Up @@ -469,7 +487,7 @@ describe(Async.name, () => {
running--;
});

await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true });
await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: true });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(2);
});
Expand Down Expand Up @@ -542,6 +560,10 @@ describe(Async.name, () => {
});

describe(Async.runWithRetriesAsync.name, () => {
afterEach(() => {
jest.restoreAllMocks();
});

it('Correctly handles a sync function that succeeds the first time', async () => {
const expectedResult: string = 'RESULT';
const result: string = await Async.runWithRetriesAsync({ action: () => expectedResult, maxRetries: 0 });
Expand Down Expand Up @@ -688,6 +710,142 @@ describe(Async.name, () => {
expect(sleepSpy).toHaveBeenCalledTimes(1);
expect(sleepSpy).toHaveBeenLastCalledWith(5);
});

describe('allowOversubscription=false operations', () => {
it.each([
{
concurrency: 4,
weight: 4,
expectedConcurrency: 1,
numberOfTasks: 4
},
{
concurrency: 4,
weight: 1,
expectedConcurrency: 4,
numberOfTasks: 4
},
{
concurrency: 4,
weight: 5,
expectedConcurrency: 1,
numberOfTasks: 2
}
])(
'enforces strict concurrency limits when allowOversubscription=false: concurrency=$concurrency, weight=$weight, expects max $expectedConcurrency concurrent operations',
async ({ concurrency, weight, expectedConcurrency, numberOfTasks }) => {
let running: number = 0;
let maxRunning: number = 0;

const array: INumberWithWeight[] = Array.from({ length: numberOfTasks }, (v, i) => i).map((n) => ({
n,
weight
}));

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async () => {
running++;
await Async.sleepAsync(0);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false });
expect(fn).toHaveBeenCalledTimes(numberOfTasks);
expect(maxRunning).toEqual(expectedConcurrency);
}
);

it('waits for a small and large operation to finish before scheduling more', async () => {
let running: number = 0;
let maxRunning: number = 0;

const array: INumberWithWeight[] = [
{ n: 1, weight: 1 },
{ n: 2, weight: 10 },
{ n: 3, weight: 1 },
{ n: 4, weight: 10 },
{ n: 5, weight: 1 },
{ n: 6, weight: 10 },
{ n: 7, weight: 1 },
{ n: 8, weight: 10 }
];

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
await Async.sleepAsync(0);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false });
expect(fn).toHaveBeenCalledTimes(8);
expect(maxRunning).toEqual(1);
});

it('handles operation with mixed weights', async () => {
const concurrency: number = 3;
let running: number = 0;
let maxRunning: number = 0;
const taskToMaxConcurrency: Record<number, number> = {};

const array: INumberWithWeight[] = [
{ n: 1, weight: 1 },
{ n: 2, weight: 2 },
{ n: 3, weight: concurrency },
{ n: 4, weight: 1 },
{ n: 5, weight: 1 }
];

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
taskToMaxConcurrency[item.n] = running;
await Async.sleepAsync(0);
maxRunning = Math.max(maxRunning, running);
running--;
});

await Async.forEachAsync(array, fn, { concurrency, weighted: true, allowOversubscription: false });
expect(fn).toHaveBeenCalledTimes(5);
expect(maxRunning).toEqual(2);

expect(taskToMaxConcurrency[1]).toEqual(1); // task 1
expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2
expect(taskToMaxConcurrency[3]).toEqual(1); // task 3
expect(taskToMaxConcurrency[4]).toEqual(1); // task 4
expect(taskToMaxConcurrency[5]).toEqual(2); // task 4 + 5
});

it('allows operations with weight 0 to be picked up when system is at max concurrency', async () => {
let running: number = 0;
let maxRunning: number = 0;
const taskToMaxConcurrency: Record<number, number> = {};

const array: INumberWithWeight[] = [
{ n: 1, weight: 1 },
{ n: 2, weight: 0 },
{ n: 3, weight: 3 },
{ n: 4, weight: 1 }
];

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
taskToMaxConcurrency[item.n] = running;
maxRunning = Math.max(maxRunning, running);
await Async.sleepAsync(0);
running--;
});

await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true, allowOversubscription: false });

expect(fn).toHaveBeenCalledTimes(4);
expect(maxRunning).toEqual(2);

expect(taskToMaxConcurrency[1]).toEqual(1); // task 1
expect(taskToMaxConcurrency[2]).toEqual(2); // task 1 + 2
expect(taskToMaxConcurrency[3]).toEqual(2); // task 2 + 3
expect(taskToMaxConcurrency[4]).toEqual(1); // task 4
});
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@
*/
"enableParallelism": false,

/**
* Controls whether weighted operations can start when the total weight would exceed the limit
* but is currently below the limit. This setting only applies when "enableParallelism" is true
* and operations have a "weight" property configured in their rush-project.json "operationSettings".
* Choose true (the default) to favor parallelism. Choose false to strictly stay under the limit.
*/
"allowOversubscription": false,

/**
* Normally projects will be processed according to their dependency order: a given project will not start
* processing the command until all of its dependencies have completed. This restriction doesn't apply for
Expand Down
2 changes: 2 additions & 0 deletions libraries/rush-lib/src/api/CommandLineJson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface IBaseCommandJson {
export interface IBulkCommandJson extends IBaseCommandJson {
commandKind: 'bulk';
enableParallelism: boolean;
allowOversubscription?: boolean;
ignoreDependencyOrder?: boolean;
ignoreMissingScript?: boolean;
incremental?: boolean;
Expand All @@ -38,6 +39,7 @@ export interface IBulkCommandJson extends IBaseCommandJson {
export interface IPhasedCommandWithoutPhasesJson extends IBaseCommandJson {
commandKind: 'phased';
enableParallelism: boolean;
allowOversubscription?: boolean;
incremental?: boolean;
}

Expand Down
5 changes: 5 additions & 0 deletions libraries/rush-lib/src/cli/RushCommandLineParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,11 @@ export class RushCommandLineParser extends CommandLineParser {
incremental: command.incremental || false,
disableBuildCache: command.disableBuildCache || false,

// The Async.forEachAsync() API defaults allowOversubscription=false, whereas Rush historically
// defaults allowOversubscription=true to favor faster builds rather than strictly staying below
// the CPU limit.
allowOversubscription: command.allowOversubscription ?? true,

initialPhases: command.phases,
originalPhases: command.originalPhases,
watchPhases: command.watchPhases,
Expand Down
Loading
Loading