Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/common/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func allocateTask(ssn *framework.Session, stmt *framework.Statement, nodes []*no
}

log.InfraLogger.V(6).Infof("Looking for best node for task - Task: <%s/%s>, init requested: <%v>.",
task.Namespace, task.Name, task.ResReq)
task.Namespace, task.Name, task.ResReqVector)

orderedNodes := ssn.OrderedNodesByTask(nodes, task)
for _, node := range orderedNodes {
Expand Down Expand Up @@ -175,7 +175,7 @@ func allocateTaskToNode(ssn *framework.Session, stmt *framework.Statement, task

func bindTaskToNode(ssn *framework.Session, stmt *framework.Statement, task *pod_info.PodInfo, node *node_info.NodeInfo) bool {
log.InfraLogger.V(6).Infof("Binding Task <%v/%v> to node <%v>, requires: %v GPUs",
task.Namespace, task.Name, node.Name, task.ResReq)
task.Namespace, task.Name, node.Name, task.ResReqVector)

if err := stmt.Allocate(task, node.Name); err != nil {
log.InfraLogger.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", task.UID, node.Name, ssn.ID, err)
Expand All @@ -186,7 +186,7 @@ func bindTaskToNode(ssn *framework.Session, stmt *framework.Statement, task *pod

func pipelineTaskToNode(ssn *framework.Session, stmt *framework.Statement, task *pod_info.PodInfo, node *node_info.NodeInfo, updateTasksIfExistsOnNode bool) bool {
log.InfraLogger.V(6).Infof("Pipelining Task <%v/%v> to node <%v> requires: %v GPUs",
task.Namespace, task.Name, node.Name, task.ResReq)
task.Namespace, task.Name, node.Name, task.ResReqVector)

if err := stmt.Pipeline(task, node.Name, updateTasksIfExistsOnNode); err != nil {
log.InfraLogger.V(6).Infof("Failed to pipeline Task %v on %v in Session %v", task.UID, node.Name, ssn.ID)
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/actions/common/feasible_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ var (
allNodeNames = append(gpuNodeNames, cpuNode.Name)
)

func init() {
vectorMap := resource_info.NewResourceVectorMap()
vectorMap.AddResource("nvidia.com/mig-1g.10gb")
for _, node := range allNodes {
node.VectorMap = vectorMap
node.IdleVector = node.Idle.ToVector(vectorMap)
node.ReleasingVector = node.Releasing.ToVector(vectorMap)
}
}

func TestFeasibleNodes(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (ig *AccumulatedIdleGpus) updateRequiredResources(scenario *scenario.ByNode

var requiredResources []float64
for _, pod := range scenario.PendingTasks() {
requiredResources = append(requiredResources, pod.ResReq.GPUs()+float64(pod.ResReq.GetDraGpusCount()))
requiredResources = append(requiredResources, pod.ResReqVector.Get(pod.VectorMap.GetIndex("gpu")))
ig.pendingTasksInState[pod.UID] = true
}
sort.Sort(sort.Reverse(sort.Float64Slice(requiredResources)))
Expand All @@ -174,7 +174,7 @@ func (ig *AccumulatedIdleGpus) updateWithVictim(task *pod_info.PodInfo, minIdleG
}

prevMinRelevantValue := ig.nodesNameToIdleGpus[minIdleGpusRelevant]
ig.nodesNameToIdleGpus[task.NodeName] += task.AcceptedResource.GPUs() + float64(task.AcceptedResource.GetDraGpusCount())
ig.nodesNameToIdleGpus[task.NodeName] += task.AcceptedResourceVector.Get(task.VectorMap.GetIndex("gpu"))

if ig.nodesNameToIdleGpus[task.NodeName] > prevMinRelevantValue {
ig.maxFreeGpuNodesSorted = orderedInsert(ig.maxFreeGpuNodesSorted, task.NodeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/framework"
)

var testVectorMap = resource_info.NewResourceVectorMap()

func Test_orderedInsert(t *testing.T) {
type args[T cmp.Ordered] struct {
array []T
Expand Down Expand Up @@ -232,6 +234,8 @@ func TestAccumulatedIdleGpus_updateWithVictim(t *testing.T) {
2, 0,
),
},
AcceptedResourceVector: resource_info.NewResourceVectorWithValues(0, 0, 2, testVectorMap),
VectorMap: testVectorMap,
},
minIdleGpusRelevant: "n2",
},
Expand All @@ -255,6 +259,8 @@ func TestAccumulatedIdleGpus_updateWithVictim(t *testing.T) {
2, 0,
),
},
AcceptedResourceVector: resource_info.NewResourceVectorWithValues(0, 0, 2, testVectorMap),
VectorMap: testVectorMap,
},
minIdleGpusRelevant: "n2",
},
Expand All @@ -278,6 +284,8 @@ func TestAccumulatedIdleGpus_updateWithVictim(t *testing.T) {
2, 0,
),
},
AcceptedResourceVector: resource_info.NewResourceVectorWithValues(0, 0, 2, testVectorMap),
VectorMap: testVectorMap,
},
minIdleGpusRelevant: "n4",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,11 @@ func initializeSession(jobsCount, tasksPerJob int) (*framework.Session, []*pod_i
queueName := fmt.Sprintf("team-%d", jobID)
newJob, jobTasks := createJobWithTasks(tasksPerJob, jobID, queueName, v1.PodRunning, []v1.ResourceRequirements{requireOneGPU()})
jobs = append(jobs, newJob)
allocatedVector := newJob.Allocated.ToVector(vectorMap)
node.Allocatable.Add(newJob.Allocated)
node.AllocatableVector.Add(allocatedVector)
node.Idle.Add(newJob.Allocated)
node.IdleVector.Add(allocatedVector)
_ = node.AddTasksToNode(jobTasks, map[common_info.PodID]*pod_info.PodInfo{})
tasks = append(tasks, jobTasks...)
queues = append(queues, createQueue(queueName))
Expand Down
54 changes: 34 additions & 20 deletions pkg/scheduler/actions/utils/job_order_by_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
testPod = "p1"
)

var testVectorMap = resource_info.NewResourceVectorMap()

func TestNumericalPriorityWithinSameQueue(t *testing.T) {
ssn := newPrioritySession(t)

Expand Down Expand Up @@ -210,16 +212,16 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
1024,
),
UID: "p1",
VectorMap: testVectorMap,
AcceptedResource: resource_info.NewResourceRequirements(1, 1000, 1024),
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
},
},
},
Allocated: resource_info.NewResource(1000, 1024, 1),
Allocated: resource_info.NewResource(1000, 1024, 1),
VectorMap: testVectorMap,
AllocatedVector: resource_info.NewResource(1000, 1024, 1).ToVector(testVectorMap),
},
"q1j2": {
Name: "q1j2",
Expand All @@ -228,16 +230,16 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
1024,
),
UID: "p1",
VectorMap: testVectorMap,
AcceptedResource: resource_info.NewResourceRequirements(1, 1000, 1024),
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
},
},
},
Allocated: resource_info.NewResource(1000, 1024, 1),
Allocated: resource_info.NewResource(1000, 1024, 1),
VectorMap: testVectorMap,
AllocatedVector: resource_info.NewResource(1000, 1024, 1).ToVector(testVectorMap),
},
"q1j3": {
Name: "q1j3",
Expand All @@ -246,7 +248,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
UID: "p1",
VectorMap: testVectorMap,
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
Expand All @@ -264,7 +268,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
UID: "p1",
VectorMap: testVectorMap,
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
Expand All @@ -282,7 +288,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
UID: "p1",
VectorMap: testVectorMap,
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
Expand All @@ -291,7 +299,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
},
},
},
Allocated: resource_info.NewResource(1000, 1024, 1),
Allocated: resource_info.NewResource(1000, 1024, 1),
VectorMap: testVectorMap,
AllocatedVector: resource_info.NewResource(1000, 1024, 1).ToVector(testVectorMap),
},
"q2j3": {
Name: "q2j3",
Expand All @@ -300,7 +310,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
PodStatusIndex: map[pod_status.PodStatus]pod_info.PodsMap{
pod_status.Allocated: {
"p1": {
UID: "p1",
UID: "p1",
VectorMap: testVectorMap,
AcceptedResourceVector: resource_info.NewResourceRequirements(1, 1000, 1024).ToVector(testVectorMap),
AcceptedResource: resource_info.NewResourceRequirements(
1,
1000,
Expand All @@ -309,7 +321,9 @@ func TestVictimQueue_PopNextJob(t *testing.T) {
},
},
},
Allocated: resource_info.NewResource(1000, 1024, 1),
Allocated: resource_info.NewResource(1000, 1024, 1),
VectorMap: testVectorMap,
AllocatedVector: resource_info.NewResource(1000, 1024, 1).ToVector(testVectorMap),
},
},
expectedJobNames: []string{"q1j3", "q2j3", "q1j2", "q2j2", "q1j1", "q2j1"},
Expand Down
88 changes: 37 additions & 51 deletions pkg/scheduler/api/common_info/job_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"strings"

enginev2alpha2 "github.com/NVIDIA/KAI-scheduler/pkg/apis/scheduling/v2alpha2"
"github.com/NVIDIA/KAI-scheduler/pkg/common/constants"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/resource_info"
"github.com/dustin/go-humanize"
v1 "k8s.io/api/core/v1"
)

const (
Expand Down Expand Up @@ -141,68 +143,52 @@ func (f *TopologyFitError) DetailedMessage() string {

func NewTopologyInsufficientResourcesError(
jobName, subGroupName, namespace, domainID string,
resourceRequested *resource_info.Resource, availableResource *resource_info.Resource,
resourceRequested resource_info.ResourceVector, availableResource resource_info.ResourceVector,
vectorMap *resource_info.ResourceVectorMap,
) *TopologyFitError {
var shortMessages []string
var detailedMessages []string

if len(resourceRequested.MigResources()) > 0 {
for migProfile, quant := range resourceRequested.MigResources() {
availableMigProfilesQuant := int64(0)
if _, found := availableResource.ScalarResources()[migProfile]; found {
availableMigProfilesQuant = availableResource.ScalarResources()[migProfile]
}
if availableMigProfilesQuant < quant {
detailedMessages = append(detailedMessages,
fmt.Sprintf("%s didn't have enough resource: %s, requested: %d, available: %d",
domainID, migProfile, quant, availableMigProfilesQuant))
shortMessages = append(shortMessages, fmt.Sprintf("node-group(s) didn't have enough of mig profile: %s",
migProfile))
}
for i := 0; i < vectorMap.Len(); i++ {
resourceName := vectorMap.ResourceAt(i)
requested := resourceRequested.Get(i)
available := availableResource.Get(i)
if requested <= available {
continue
}
} else {
requestedGPUs := resourceRequested.GPUs()
availableGPUs := availableResource.GPUs()
if requestedGPUs > availableGPUs {

if resource_info.IsMigResource(v1.ResourceName(resourceName)) {
detailedMessages = append(detailedMessages,
fmt.Sprintf("%s didn't have enough resource: %s, requested: %d, available: %d",
domainID, resourceName, int64(requested), int64(available)))
shortMessages = append(shortMessages, fmt.Sprintf("node-group(s) didn't have enough of mig profile: %s",
resourceName))
} else if resourceName == constants.GpuResource {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resource: GPUs, requested: %s, available: %s",
domainID,
strconv.FormatFloat(requestedGPUs, 'g', 3, 64),
strconv.FormatFloat(availableGPUs, 'g', 3, 64),
strconv.FormatFloat(requested, 'g', 3, 64),
strconv.FormatFloat(available, 'g', 3, 64),
))
shortMessages = append(shortMessages, "node-group(s) didn't have enough resources: GPUs")
}
}

requestedCPUs := int64(resourceRequested.Cpu())
availableCPUs := int64(availableResource.Cpu())
if requestedCPUs > availableCPUs {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resources: CPU cores, requested: %s, available: %s",
domainID,
humanize.FtoaWithDigits(resourceRequested.Cpu()/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(availableResource.Cpu()/resource_info.MilliCPUToCores, 3),
))
shortMessages = append(shortMessages, "node-group(s) didn't have enough resources: CPU cores")
}

if resourceRequested.Memory() > availableResource.Memory() {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resources: memory, requested: %s, available: %s",
domainID,
humanize.FtoaWithDigits(resourceRequested.Memory()/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(availableResource.Memory()/resource_info.MemoryToGB, 3),
))
shortMessages = append(shortMessages, "node-group(s) didn't have enough resources: memory")
}

for requestedResourceName, requestedResourceQuant := range resourceRequested.ScalarResources() {
availableResourceQuant := int64(0)
if _, found := availableResource.ScalarResources()[requestedResourceName]; found {
availableResourceQuant = availableResource.ScalarResources()[requestedResourceName]
}
if availableResourceQuant < requestedResourceQuant {
} else if resourceName == string(v1.ResourceCPU) {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resources: CPU cores, requested: %s, available: %s",
domainID,
humanize.FtoaWithDigits(requested/resource_info.MilliCPUToCores, 3),
humanize.FtoaWithDigits(available/resource_info.MilliCPUToCores, 3),
))
shortMessages = append(shortMessages, "node-group(s) didn't have enough resources: CPU cores")
} else if resourceName == string(v1.ResourceMemory) {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resources: memory, requested: %s, available: %s",
domainID,
humanize.FtoaWithDigits(requested/resource_info.MemoryToGB, 3),
humanize.FtoaWithDigits(available/resource_info.MemoryToGB, 3),
))
shortMessages = append(shortMessages, "node-group(s) didn't have enough resources: memory")
} else {
detailedMessages = append(detailedMessages, fmt.Sprintf("%s didn't have enough resource: %s, requested: %d, available: %d",
domainID, requestedResourceName, requestedResourceQuant, availableResourceQuant))
domainID, resourceName, int64(requested), int64(available)))
shortMessages = append(shortMessages, fmt.Sprintf("node-group(s) didn't have enough resources: %s",
requestedResourceName))
resourceName))
}
}

Expand Down
Loading
Loading