Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ def handle(self, *args, **options):
('quantity_default_value', 'Int'),
('quantity_label', 'Text'),
('xdmod_resource', 'Text'),
('Features', 'Text'),
# ('eula', 'Text'),
# ('OnDemand','Yes/No'),
# ('ServiceEnd', 'Date'),
('ServiceEnd', 'Date'),
# ('ServiceStart', 'Date'),
('slurm_cluster', 'Text'),
('slurm_specs', 'Attribute Expanded Text'),
Expand Down
2 changes: 2 additions & 0 deletions coldfront/core/resource/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from django.db import models
from django.conf import settings
from django.contrib.auth.models import Group
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from model_utils.fields import AutoLastModifiedField
from model_utils.models import TimeStampedModel
from simple_history.models import HistoricalRecords

Expand Down
212 changes: 212 additions & 0 deletions coldfront/plugins/slurm/management/commands/slurm_manage_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import logging
import os
import re
from functools import reduce
from cProfile import Profile
from django.utils import timezone

from django.core.management.base import BaseCommand
from simple_history.utils import bulk_update_with_history, bulk_create_with_history

from coldfront.core.resource.models import ResourceType, ResourceAttribute, ResourceAttributeType, AttributeType, Resource
from coldfront.core.project.models import Project
from coldfront.plugins.slurm.utils import slurm_get_nodes_info

logger = logging.getLogger(__name__)

class Command(BaseCommand):
help = 'Manage slurm resources from sinfo output'

def get_output_from_file(self, file_path):
try:
keys = None
with open(file_path, 'r') as output_file:
for line in output_file:
if keys is None:
keys = re.sub(r'\s+', ' ', line).strip().lower().split(' ')
else:
values = re.sub(r'\s+', ' ', line).strip().split(' ')
yield dict(zip(keys, values))
except FileNotFoundError:
logger.error(f"File at {file_path} does not exist. Cant simulate output.")
except IOError as e:
logger.error(f"An error occurred: {e}")

def add_arguments(self, parser):
parser.add_argument("-e", "--environment", help="Environment, use dev to simulate output")
parser.add_argument('--profile', action='store_true', default=False)

def handle(self, *args, **options):
if options.get('profile', False):
profiler = Profile()
profiler.runcall(self._handle, *args, **options)
profiler.print_stats()
else:
self._handle(*args, **options)

def _handle(self, *args, **options):
def calculate_gpu_count(gres_value):
if 'null' in gres_value:
return 0
gpu_list = gres_value.split('),')
return reduce(lambda x, y: x + y,[int(gpu_info.split(':')[2].replace('(S','')) for gpu_info in gpu_list])

def calculate_cpu_count(row):
if row.get('s:c:t', None) is None:
return 0
cpu_count = row.get('s:c:t').split(':')[1]
return int(cpu_count)

def calculate_owner_value(project_list, row):
owner_name = ''
project_name_list = [project.title for project in project_list]
owner_lists = row.get('groups', '').split(',')
owner_project = [name_owner for name_owner in owner_lists if name_owner in project_name_list]
if len(owner_project) > 0:
return owner_project[0]
if {'cluster_users', 'slurm-admin'}.issubset(set(owner_lists)):
return'FASRC'
return owner_name

def get_cluster():
return Resource.objects.get(resource_type__name='Cluster')


env = options['environment'] or 'production'
if 'dev' in env:
output = self.get_output_from_file(os.path.join(os.getcwd(), 'coldfront/plugins/slurm/management/commands/sinfo.txt'))
else:
output = slurm_get_nodes_info()
logger.debug(f'Running on {env} mode')
modify_history_date = timezone.now()
project_list = Project.objects.all()
current_cluster = get_cluster()
compute_node = ResourceType.objects.get(name='Compute Node')
attribute_type_name_list = ['GPU Count', 'Core Count', 'Features', 'Owner', 'ServiceEnd']
partition_resource_type = ResourceType.objects.get(name='Cluster Partition')
gpu_count_attribute_type = ResourceAttributeType.objects.get(name='GPU Count')
core_count_attribute_type = ResourceAttributeType.objects.get(name='Core Count')
features_attribute_type = ResourceAttributeType.objects.get(name='Features')
owner_attribute_type = ResourceAttributeType.objects.get(name='Owner')
service_end_attribute_type = ResourceAttributeType.objects.get(name='ServiceEnd')
existing_resource_attributes = list(ResourceAttribute.objects.filter(
resource_attribute_type__name__in=attribute_type_name_list,
resource__resource_type__name='Compute Node'
).values_list('pk', 'resource__name', 'resource_attribute_type__name')
)
existing_resource_attributes_check = [f'{resource_att[1]} {resource_att[2]}' for resource_att in existing_resource_attributes]
existing_resource_attributes_pk_map = {f'{resource_att[1]} {resource_att[2]}': resource_att[0] for resource_att in existing_resource_attributes}
processed_resources = set()
bulk_update_resource_attribute = []
bulk_create_resource_attribute = []
bulk_update_resource = []
processed_resource_attribute = []
for row in output:
new_resource, compute_node_created_created = Resource.objects.get_or_create(
name=row['nodelist'],
defaults={
'is_allocatable':False,
'resource_type':compute_node,
'parent_resource':current_cluster
}
)
Resource.objects.get_or_create(name=row['partition'], defaults={'resource_type':partition_resource_type})

gpu_count = ResourceAttribute(resource_attribute_type=gpu_count_attribute_type, resource=new_resource, value=calculate_gpu_count(row['gres']))
gpu_count_key = f"{row['nodelist']} {gpu_count_attribute_type.name}"
if gpu_count_key in existing_resource_attributes_check:
gpu_count.pk = existing_resource_attributes_pk_map[gpu_count_key]
bulk_update_resource_attribute.append(gpu_count)
else:
if gpu_count_key not in processed_resource_attribute:
bulk_create_resource_attribute.append(gpu_count)
processed_resource_attribute.append(gpu_count_key)

core_count = ResourceAttribute(resource_attribute_type=core_count_attribute_type, resource=new_resource, value=calculate_cpu_count(row))
core_count_key = f"{row['nodelist']} {core_count_attribute_type.name}"
if core_count_key in existing_resource_attributes_check:
core_count.pk = existing_resource_attributes_pk_map[core_count_key]
bulk_update_resource_attribute.append(core_count)
else:
if core_count_key not in processed_resource_attribute:
bulk_create_resource_attribute.append(core_count)
processed_resource_attribute.append(core_count_key)

features = ResourceAttribute(resource_attribute_type=features_attribute_type, resource=new_resource, value=row.get('avail_features', '(null)'))
features_key = f"{row['nodelist']} {features_attribute_type.name}"
if features_key in existing_resource_attributes_check:
features.pk = existing_resource_attributes_pk_map[features_key]
bulk_update_resource_attribute.append(features)
else:
if features_key not in processed_resource_attribute:
bulk_create_resource_attribute.append(features)
processed_resource_attribute.append(features_key)

owner = ResourceAttribute(resource_attribute_type=owner_attribute_type, resource=new_resource, value=calculate_owner_value(project_list, row))
owner_key = f"{row['nodelist']} {owner_attribute_type.name}"
if owner_key in existing_resource_attributes_check:
owner.pk = existing_resource_attributes_pk_map[owner_key]
bulk_update_resource_attribute.append(owner)
else:
if owner_key not in processed_resource_attribute:
bulk_create_resource_attribute.append(owner)
processed_resource_attribute.append(owner_key)

if new_resource.is_available is False:
new_resource.is_available = True
bulk_update_resource.append(new_resource)
service_end_pk = existing_resource_attributes_pk_map[f"{row['nodelist']} {service_end_attribute_type.name}"]
bulk_update_resource_attribute.append(
ResourceAttribute(
resource=new_resource, value=None,
resource_attribute_type=service_end_attribute_type,
pk=service_end_pk,
modified=modify_history_date
)
)
processed_resources.add(new_resource.name)
try:
logger.debug(f'Updating {len(bulk_update_resource_attribute)} ResourceAttribute records')
bulk_update_with_history(
bulk_update_resource_attribute, ResourceAttribute, ['value'],
batch_size=500, default_change_reason='slurm_manage_resource command',
default_date=modify_history_date
)
logger.debug(f'Updating {len(bulk_update_resource)} Resource records')
bulk_update_with_history(
bulk_update_resource, Resource, ['is_available'], batch_size=500,
default_change_reason='slurm_manage_resource command', default_date=modify_history_date
)
logger.debug(f'Creating {len(bulk_create_resource_attribute)} ResourceAttribute records')
bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command')
except Exception as e:
logger.debug(f'Error processing resources info: {str(e)}')
raise
bulk_update_resource_attribute = []
bulk_create_resource_attribute = []
bulk_update_resource = []
for resource_to_delete in Resource.objects.exclude(name__in=list(processed_resources)).filter(is_available=True, resource_type=compute_node):
resource_to_delete.is_available = False
bulk_update_resource.append(resource_to_delete)
service_end = ResourceAttribute(resource=resource_to_delete, value=modify_history_date, resource_attribute_type=service_end_attribute_type)
if f"{resource_to_delete.name} {service_end_attribute_type.name}" in existing_resource_attributes_check:
service_end.pk = existing_resource_attributes_pk_map[f"{resource_to_delete.name} {service_end_attribute_type.name}"]
bulk_update_resource_attribute.append(service_end)
else:
bulk_create_resource_attribute.append(service_end)
try:
logger.debug(f'Decommissioning {bulk_update_resource} Resource records')
bulk_update_with_history(
bulk_update_resource, Resource, ['is_available'], batch_size=500,
default_change_reason='slurm_manage_resource command', default_date=modify_history_date
)
logger.debug(f'Creating {len(bulk_create_resource_attribute)} ServiceEnd ResourceAttribute records')
bulk_create_with_history(bulk_create_resource_attribute, ResourceAttribute, batch_size=500, default_change_reason='slurm_manage_resource command')
logger.debug(f'Updating {len(bulk_update_resource_attribute)} ServiceEnd ResourceAttribute records')
bulk_update_with_history(
bulk_update_resource_attribute, ResourceAttribute, ['value'], batch_size=500,
default_change_reason='slurm_manage_resource command', default_date=modify_history_date
)
except Exception as e:
logger.error(f'Error cleaning up resources: {str(e)}')
raise
14 changes: 14 additions & 0 deletions coldfront/plugins/slurm/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
import shlex
import struct
import subprocess
Expand Down Expand Up @@ -34,6 +35,7 @@
SLURM_CMD_CHECK_ASSOCIATION = SLURM_SACCTMGR_PATH + ' list associations User={} Cluster={} Account={} Format=Cluster,Account,User,QOS -P'
SLURM_CMD_BLOCK_ACCOUNT = SLURM_SACCTMGR_PATH + ' -Q -i modify account {} where Cluster={} set GrpSubmitJobs=0'
SLURM_CMD_DUMP_CLUSTER = SLURM_SACCTMGR_PATH + ' dump {} file={}'
SLURM_CMD_SINFO_NODES = 'sinfo -N -r --format="%19N %23P %66G %5a %5c %8z %65f %50g"'
SLURM_CMD_LIST_PARTITIONS = SLURM_SCONTROL_PATH + ' show partitions'
SLURM_CMD_GET_USER_INFO = SLURM_SSHARE_PATH + ' -u {} -A {} -o Account,User,RawShares,NormShares,RawUsage,FairShare --parsable'

Expand Down Expand Up @@ -145,6 +147,18 @@ def slurm_block_account(cluster, account, noop=False):
cmd = SLURM_CMD_BLOCK_ACCOUNT.format(shlex.quote(account), shlex.quote(cluster))
_run_slurm_cmd(cmd, noop=noop)

def slurm_get_nodes_info():
def table_to_dict_list(table_info):
keys = re.sub(r'\s+', ' ', table_info[0]).strip().lower().split(' ')
values = [re.sub(r'\s+', ' ', row).strip().split(' ') for row in table_info[1:] if len(row)>0]
result_list = [dict(zip(keys, row)) for row in values if len(row)> 0]
return result_list

cmd = SLURM_CMD_SINFO_NODES
nodes_info = _run_slurm_cmd(cmd, noop=False)
nodes_info = nodes_info.decode('utf-8').split('\n')
return table_to_dict_list(nodes_info)

def slurm_check_assoc(user, cluster, account):
cmd = SLURM_CMD_CHECK_ASSOCIATION.format(
shlex.quote(user), shlex.quote(cluster), shlex.quote(account)
Expand Down
Loading