Skip to content

Commit f67b833

Browse files
committed
check used resources on all envs for disk and ram
1 parent cd6fbfd commit f67b833

File tree

3 files changed

+71
-15
lines changed

3 files changed

+71
-15
lines changed

src/components/c2d/compute_engine_base.ts

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ export abstract class C2DEngine {
317317
CORE_LOGGER.error('Failed to get running jobs:' + e.message)
318318
}
319319

320-
const envResourceIds = new Set((env.resources || []).map((r) => r.id))
320+
const envResourceMap = new Map((env.resources || []).map((r) => [r.id, r]))
321321

322322
let totalJobs = 0
323323
let totalFreeJobs = 0
@@ -354,9 +354,12 @@ export abstract class C2DEngine {
354354

355355
if (isRunning) {
356356
for (const resource of job.resources) {
357-
if (envResourceIds.has(resource.id)) {
358-
const isCpu = resource.id === 'cpu'
359-
if (isCpu && !isThisEnv) continue // CPU is partitioned, skip other envs
357+
const envRes = envResourceMap.get(resource.id)
358+
if (envRes) {
359+
// GPUs are shared-exclusive: inUse tracked globally across all envs
360+
// Everything else (cpu, ram, disk) is per-env exclusive
361+
const isSharedExclusive = envRes.type === 'gpu'
362+
if (!isSharedExclusive && !isThisEnv) continue
360363
if (!(resource.id in usedResources)) usedResources[resource.id] = 0
361364
usedResources[resource.id] += resource.amount
362365
if (job.isFree) {
@@ -381,12 +384,41 @@ export abstract class C2DEngine {
381384
}
382385
}
383386

387+
private checkGlobalResourceAvailability(
388+
allEnvironments: ComputeEnvironment[],
389+
resourceId: string,
390+
amount: number,
391+
isFree: boolean
392+
) {
393+
let globalUsed = 0
394+
let globalTotal = 0
395+
for (const e of allEnvironments) {
396+
const res = isFree
397+
? e.free
398+
? this.getResource(e.free.resources, resourceId)
399+
: null
400+
: this.getResource(e.resources, resourceId)
401+
if (res) {
402+
globalTotal += res.total || 0
403+
globalUsed += res.inUse || 0
404+
}
405+
}
406+
const globalRemainder = globalTotal - globalUsed
407+
if (globalRemainder < amount) {
408+
const suffix = isFree ? ' for free' : ''
409+
throw new Error(
410+
`Not enough available ${resourceId}${suffix} globally (remaining: ${globalRemainder}, requested: ${amount})`
411+
)
412+
}
413+
}
414+
384415
// overridden by each engine if required
385416
// eslint-disable-next-line require-await
386417
public async checkIfResourcesAreAvailable(
387418
resourcesRequest: ComputeResourceRequest[],
388419
env: ComputeEnvironment,
389-
isFree: boolean
420+
isFree: boolean,
421+
allEnvironments?: ComputeEnvironment[]
390422
) {
391423
// Filter out resources with amount 0 as they're not actually being requested
392424
const activeResources = resourcesRequest.filter((r) => r.amount > 0)
@@ -396,12 +428,33 @@ export abstract class C2DEngine {
396428
if (!envResource) throw new Error(`No such resource ${request.id}`)
397429
if (envResource.total - envResource.inUse < request.amount)
398430
throw new Error(`Not enough available ${request.id}`)
431+
432+
// Global check for non-GPU resources (cpu, ram, disk are per-env exclusive)
433+
// GPUs are shared-exclusive so their inUse already reflects global usage
434+
if (allEnvironments && envResource.type !== 'gpu') {
435+
this.checkGlobalResourceAvailability(
436+
allEnvironments,
437+
request.id,
438+
request.amount,
439+
false
440+
)
441+
}
442+
399443
if (isFree) {
400444
if (!env.free) throw new Error(`No free resources`)
401445
envResource = this.getResource(env.free?.resources, request.id)
402446
if (!envResource) throw new Error(`No such free resource ${request.id}`)
403447
if (envResource.total - envResource.inUse < request.amount)
404448
throw new Error(`Not enough available ${request.id} for free`)
449+
450+
if (allEnvironments && envResource.type !== 'gpu') {
451+
this.checkGlobalResourceAvailability(
452+
allEnvironments,
453+
request.id,
454+
request.amount,
455+
true
456+
)
457+
}
405458
}
406459
}
407460
if ('maxJobs' in env && env.maxJobs && env.runningJobs + 1 > env.maxJobs) {

src/components/c2d/compute_engine_docker.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,12 +1553,11 @@ export class C2DEngineDocker extends C2DEngine {
15531553
}
15541554
// check if resources are available now
15551555
try {
1556-
const env = await this.getComputeEnvironment(
1557-
job.payment && job.payment.chainId ? job.payment.chainId : null,
1558-
job.environment,
1559-
null
1560-
)
1561-
await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree)
1556+
const chainId = job.payment && job.payment.chainId ? job.payment.chainId : null
1557+
const allEnvs = await this.getComputeEnvironments(chainId)
1558+
const env = allEnvs.find((e) => e.id === job.environment)
1559+
if (!env) throw new Error(`Environment ${job.environment} not found`)
1560+
await this.checkIfResourcesAreAvailable(job.resources, env, job.isFree, allEnvs)
15621561
} catch (err) {
15631562
// resources are still not available
15641563
return

src/components/core/compute/startCompute.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
import { EncryptMethod } from '../../../@types/fileObject.js'
3030
import {
3131
ComputeAccessList,
32+
ComputeEnvironment,
3233
ComputeResourceRequestWithPrice
3334
} from '../../../@types/C2D/C2D.js'
3435
// import { verifyProviderFees } from '../utils/feesHandler.js'
@@ -119,8 +120,10 @@ export class PaidComputeStartHandler extends CommonComputeHandler {
119120
}
120121
}
121122

123+
let allEnvs: ComputeEnvironment[]
122124
try {
123-
env = await engine.getComputeEnvironment(null, task.environment)
125+
allEnvs = await engine.getComputeEnvironments()
126+
env = allEnvs.find((e) => e.id === task.environment)
124127
if (!env) {
125128
return {
126129
stream: null,
@@ -150,7 +153,7 @@ export class PaidComputeStartHandler extends CommonComputeHandler {
150153
}
151154
}
152155
try {
153-
await engine.checkIfResourcesAreAvailable(task.resources, env, false)
156+
await engine.checkIfResourcesAreAvailable(task.resources, env, false, allEnvs)
154157
} catch (e) {
155158
if (task.queueMaxWaitTime > 0) {
156159
CORE_LOGGER.verbose(
@@ -885,7 +888,8 @@ export class FreeComputeStartHandler extends CommonComputeHandler {
885888
}
886889
}
887890
}
888-
const env = await engine.getComputeEnvironment(null, task.environment)
891+
const allFreeEnvs = await engine.getComputeEnvironments()
892+
const env = allFreeEnvs.find((e) => e.id === task.environment)
889893
if (!env) {
890894
return {
891895
stream: null,
@@ -931,7 +935,7 @@ export class FreeComputeStartHandler extends CommonComputeHandler {
931935
}
932936
}
933937
try {
934-
await engine.checkIfResourcesAreAvailable(task.resources, env, true)
938+
await engine.checkIfResourcesAreAvailable(task.resources, env, true, allFreeEnvs)
935939
} catch (e) {
936940
if (task.queueMaxWaitTime > 0) {
937941
CORE_LOGGER.verbose(

0 commit comments

Comments
 (0)