Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions libraries/node-core-library/src/Async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ export interface IWeighted {
* Must be a whole number greater than or equal to 0.
*/
weight: number;

isolated?: boolean;
}

function toWeightedIterator<TEntry>(
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
useWeights?: boolean
): AsyncIterable<{ element: TEntry; weight: number }> {
): AsyncIterable<{ element: TEntry; weight: number; isolated: boolean }> {
const iterator: Iterator<TEntry> | AsyncIterator<TEntry, TEntry> = (
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
Expand All @@ -99,7 +101,11 @@ function toWeightedIterator<TEntry>(
// The await is necessary here, but TS will complain - it's a false positive.
const { value, done } = await iterator.next();
return {
value: { element: value, weight: useWeights ? value?.weight : 1 },
value: {
element: value,
weight: useWeights ? value?.weight : 1,
isolated: useWeights ? value?.isolated : false
},
done: !!done
};
}
Expand Down Expand Up @@ -184,7 +190,10 @@ export class Async {
return result;
}

private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
private static async _forEachWeightedAsync<
TReturn,
TEntry extends { weight: number; element: TReturn; isolated: boolean }
>(
iterable: AsyncIterable<TEntry>,
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
options?: IAsyncParallelismOptions | undefined
Expand All @@ -201,6 +210,7 @@ export class Async {
let arrayIndex: number = 0;
let iteratorIsComplete: boolean = false;
let promiseHasResolvedOrRejected: boolean = false;
let isolatedTask: IteratorResult<TEntry> | undefined = undefined;

async function queueOperationsAsync(): Promise<void> {
while (
Expand All @@ -213,7 +223,7 @@ export class Async {
// there will be effectively no cap on the number of operations waiting.
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
concurrentUnitsInProgress += limitedConcurrency;
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
const currentIteratorResult: IteratorResult<TEntry> = isolatedTask ?? (await iterator.next());
// eslint-disable-next-line require-atomic-updates
iteratorIsComplete = !!currentIteratorResult.done;

Expand All @@ -222,6 +232,20 @@ export class Async {
Async.validateWeightedIterable(currentIteratorValue);
// Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit.
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
const shouldIsolate: boolean = currentIteratorValue.isolated ?? false;

// Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync`
if (shouldIsolate) {
if (concurrentUnitsInProgress - limitedConcurrency > 0) {
// eslint-disable-next-line require-atomic-updates
isolatedTask = currentIteratorResult;
// Remove the "lock" before breaking
concurrentUnitsInProgress -= limitedConcurrency;
break;
}
}
// eslint-disable-next-line require-atomic-updates
isolatedTask = undefined;

// Remove the "lock" from the concurrency check and only apply the current weight.
// This should allow other operations to execute.
Expand Down
43 changes: 43 additions & 0 deletions libraries/node-core-library/src/test/Async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,49 @@ describe(Async.name, () => {
expect(maxRunning).toEqual(9);
});

it('ensures isolated job runs in isolation while small jobs never run alongside it', async () => {
let running: number = 0;
const jobToMaxConcurrentJobsRunning: Record<number, number> = {};

const array: (INumberWithWeight & { isolated?: boolean })[] = [
{ n: 1, weight: 1 },
{ n: 2, weight: 1 },
{ n: 3, weight: 1 },
{ n: 4, weight: 10, isolated: true }, // job that should run alone
{ n: 5, weight: 1 },
{ n: 6, weight: 1 },
{ n: 7, weight: 1 }
];

const fn: (item: INumberWithWeight) => Promise<void> = jest.fn(async (item) => {
running++;
jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running);

// Simulate longer running time for heavyweight job
if (item.weight === 10) {
await Async.sleepAsync(50);
} else {
await Async.sleepAsync(10);
}

running--;
});

await Async.forEachAsync(array, fn, { concurrency: 10, weighted: true });

expect(fn).toHaveBeenCalledTimes(7);

// The heavyweight job (n=4) should run with only 1 concurrent job (itself)
expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1);

// Small jobs should be able to run concurrently with each other but not with heavyweight job
const nonIsolatedJobs = array.filter((job) => !!job.isolated);
nonIsolatedJobs.forEach((job) => {
expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1);
expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together
});
});

it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => {
let waitingIterators: number = 0;

Expand Down
Loading