From 7e085cc3f59025a4190c37a3214033244d2faf16 Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Tue, 9 Sep 2025 16:32:52 -0400 Subject: [PATCH 1/2] first choice --- libraries/node-core-library/src/Async.ts | 5 +++ .../node-core-library/src/test/Async.test.ts | 43 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 85d0b0fcdb0..f9e422a4792 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -223,6 +223,11 @@ export class Async { // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. const weight: number = Math.min(currentIteratorValue.weight, concurrency); + // Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` + if (concurrentUnitsInProgress + weight - limitedConcurrency > concurrency) { + break; + } + // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. concurrentUnitsInProgress += weight; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 5442723e8b2..44b0eb46c72 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -496,6 +496,49 @@ describe(Async.name, () => { expect(maxRunning).toEqual(9); }); + it('ensures isolated job runs in isolation while small jobs never run alongside it', async () => { + let running: number = 0; + const jobToMaxConcurrentJobsRunning: Record = {}; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 1 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, // job that should run alone + { n: 5, weight: 1 }, + { n: 6, weight: 1 }, + { n: 7, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + jobToMaxConcurrentJobsRunning[item.n] = Math.max(jobToMaxConcurrentJobsRunning[item.n] || 0, running); + + // Simulate longer running time for heavyweight job + if (item.weight === 10) { + await Async.sleepAsync(50); + } else { + await Async.sleepAsync(10); + } + + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 10, weighted: true }); + + expect(fn).toHaveBeenCalledTimes(7); + + // The heavyweight job (n=4) should run with only 1 concurrent job (itself) + expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1); + + // Small jobs should be able to run concurrently with each other but not with heavyweight job + const nonIsolatedJobs = array.filter((job) => job.weight !== 10); + nonIsolatedJobs.forEach((job) => { + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1); + expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together + }); + }); + it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => { let waitingIterators: number = 0; From e3d472c574d125c0da7da786738f84ff2ffeddbd Mon Sep 17 00:00:00 2001 From: ethanburrelldd <223327898+ethanburrelldd@users.noreply.github.com.> Date: Tue, 9 Sep 2025 16:48:19 -0400 Subject: [PATCH 2/2] second approach, isolated: true --- libraries/node-core-library/src/Async.ts | 31 +++++++++++++++---- .../node-core-library/src/test/Async.test.ts | 6 ++-- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index f9e422a4792..4893ed5591a 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -82,12 +82,14 @@ export interface IWeighted { * Must be a whole number greater than or equal to 0. */ weight: number; + + isolated?: boolean; } function toWeightedIterator( iterable: Iterable | AsyncIterable, useWeights?: boolean -): AsyncIterable<{ element: TEntry; weight: number }> { +): AsyncIterable<{ element: TEntry; weight: number; isolated: boolean }> { const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || (iterable as AsyncIterable)[Symbol.asyncIterator] @@ -99,7 +101,11 @@ function toWeightedIterator( // The await is necessary here, but TS will complain - it's a false positive. const { value, done } = await iterator.next(); return { - value: { element: value, weight: useWeights ? value?.weight : 1 }, + value: { + element: value, + weight: useWeights ? value?.weight : 1, + isolated: useWeights ? value?.isolated : false + }, done: !!done }; } @@ -184,7 +190,10 @@ export class Async { return result; } - private static async _forEachWeightedAsync( + private static async _forEachWeightedAsync< + TReturn, + TEntry extends { weight: number; element: TReturn; isolated: boolean } + >( iterable: AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined @@ -201,6 +210,7 @@ export class Async { let arrayIndex: number = 0; let iteratorIsComplete: boolean = false; let promiseHasResolvedOrRejected: boolean = false; + let isolatedTask: IteratorResult | undefined = undefined; async function queueOperationsAsync(): Promise { while ( @@ -213,7 +223,7 @@ export class Async { // there will be effectively no cap on the number of operations waiting. const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency; concurrentUnitsInProgress += limitedConcurrency; - const currentIteratorResult: IteratorResult = await iterator.next(); + const currentIteratorResult: IteratorResult = isolatedTask ?? (await iterator.next()); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; @@ -222,11 +232,20 @@ export class Async { Async.validateWeightedIterable(currentIteratorValue); // Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit. const weight: number = Math.min(currentIteratorValue.weight, concurrency); + const shouldIsolate: boolean = currentIteratorValue.isolated ?? false; // Wait until there's have enough capacity to run this job, this function will be re-entered as tasks call `onOperationCompletionAsync` - if (concurrentUnitsInProgress + weight - limitedConcurrency > concurrency) { - break; + if (shouldIsolate) { + if (concurrentUnitsInProgress - limitedConcurrency > 0) { + // eslint-disable-next-line require-atomic-updates + isolatedTask = currentIteratorResult; + // Remove the "lock" before breaking + concurrentUnitsInProgress -= limitedConcurrency; + break; + } } + // eslint-disable-next-line require-atomic-updates + isolatedTask = undefined; // Remove the "lock" from the concurrency check and only apply the current weight. // This should allow other operations to execute. diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 44b0eb46c72..55cb2859fc5 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -500,11 +500,11 @@ describe(Async.name, () => { let running: number = 0; const jobToMaxConcurrentJobsRunning: Record = {}; - const array: INumberWithWeight[] = [ + const array: (INumberWithWeight & { isolated?: boolean })[] = [ { n: 1, weight: 1 }, { n: 2, weight: 1 }, { n: 3, weight: 1 }, - { n: 4, weight: 10 }, // job that should run alone + { n: 4, weight: 10, isolated: true }, // job that should run alone { n: 5, weight: 1 }, { n: 6, weight: 1 }, { n: 7, weight: 1 } @@ -532,7 +532,7 @@ describe(Async.name, () => { expect(jobToMaxConcurrentJobsRunning[4]).toEqual(1); // Small jobs should be able to run concurrently with each other but not with heavyweight job - const nonIsolatedJobs = array.filter((job) => job.weight !== 10); + const nonIsolatedJobs = array.filter((job) => !!job.isolated); nonIsolatedJobs.forEach((job) => { expect(jobToMaxConcurrentJobsRunning[job.n]).toBeGreaterThanOrEqual(1); expect(jobToMaxConcurrentJobsRunning[job.n]).toBeLessThanOrEqual(6); // All small jobs could theoretically run together