diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 85d0b0fcdb0..4893ed5591a 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -82,12 +82,14 @@ export interface IWeighted { * Must be a whole number greater than or equal to 0. */ weight: number; + + isolated?: boolean; } function toWeightedIterator( iterable: Iterable | AsyncIterable, useWeights?: boolean -): AsyncIterable<{ element: TEntry; weight: number }> { +): AsyncIterable<{ element: TEntry; weight: number; isolated: boolean }> { const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || (iterable as AsyncIterable)[Symbol.asyncIterator] @@ -99,7 +101,11 @@ function toWeightedIterator( // The await is necessary here, but TS will complain - it's a false positive. const { value, done } = await iterator.next(); return { - value: { element: value, weight: useWeights ? value?.weight : 1 }, + value: { + element: value, + weight: useWeights ? value?.weight : 1, + isolated: useWeights ? value?.isolated : false + }, done: !!done }; } @@ -184,7 +190,10 @@ export class Async { return result; } - private static async _forEachWeightedAsync( + private static async _forEachWeightedAsync< + TReturn, + TEntry extends { weight: number; element: TReturn; isolated: boolean } + >( iterable: AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined @@ -201,6 +210,7 @@ export class Async { let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; + let isolatedTask: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -213,7 +223,7 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = await iterator.next(); + const currentIteratorResult: IteratorResult = isolatedTask ?? (await iterator.next()); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; @@ -222,6 +232,20 @@ export class Async { Async.validateWeightedIterable(currentIteratorValue); // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. const weight: number = Math.min(currentIteratorValue.weight, concurrency); + const shouldIsolate: boolean = currentIteratorValue.isolated ?? false; + + // Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` + if (shouldIsolate) { + if (concurrentUnitsInProgress - limitedConcurrency > 0) { + // eslint-disable-next-line require-atomic-updates + isolatedTask = currentIteratorResult; + // Remove the "lock" before breaking + concurrentUnitsInProgress -= limitedConcurrency; + break; + } + } + // eslint-disable-next-line require-atomic-updates + isolatedTask = undefined; // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 5442723e8b2..55cb2859fc5 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -496,6 +496,49 @@ describe(Async.name, () => { expect(maxRunning).toEqual(9); }); + it('ensures isolated job runs in isolation while small jobs never run alongside it', async () => { + let running: number = 0; + const jobToMaxConcurrentJobsRunning: Record = {}; + + const array: (INumberWithWeight & { isolated?: boolean })[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 1 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10, isolated: true }, // job that should run alone + { n: 5, weight: 1 }, + { n: 6, weight: 1 }, + { n: 7, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); + + // Simulate longer running time for heavyweight job + if (item.weight === 10) { + await Async.sleepAsync(50); + } else { + await Async.sleepAsync(10); + } + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 10, weighted: true }); + + expect(fn).toHaveBeenCalledTimes(7); + + // The heavyweight job (n=4) should run with only 1 concurrent job (itself) + expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1); + + // Small jobs should be able to run concurrently with each other but not with heavyweight job + const nonIsolatedJobs = array.filter((job) => !!job.isolated); + nonIsolatedJobs.forEach((job) => { + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1); + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together + }); + }); + it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { let waitingIterators: number = 0;