Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 121 additions & 85 deletions src/slurmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import date, datetime, timedelta
from calendar import monthrange
from itertools import product
from typing import Any, Dict, List, Optional, Tuple, Union


try:
Expand Down Expand Up @@ -590,46 +591,37 @@ def fetch_invoices(self, start_date=None, end_date=None):
for r in rows
]

def export_summary(self, start_time, end_time):
"""Export a summary of usage and costs.

Rates represent a fixed cost per core-hour (for example, dollars
per core-hour) and must be non-negative. ``discount`` values are
fractional percentages, where ``0.2`` means a 20% discount, and
they must fall between 0 and 1, inclusive. A :class:`ValueError`
is raised if these constraints are violated.
"""

usage, totals = self.aggregate_usage(start_time, end_time)
summary = {
'summary': {},
'details': [],
'daily': [],
'monthly': [],
'yearly': [],
'invoices': [],
}
total_ch = 0.0
total_gpu = 0.0
total_cost = 0.0

rates_path = os.path.join(os.path.dirname(__file__), 'rates.json')
def _load_rates(self, rates_file: Optional[str]) -> Dict[str, Any]:
path = rates_file or os.path.join(os.path.dirname(__file__), 'rates.json')
try:
with open(rates_path) as fh:
rates_cfg = json.load(fh)
with open(path) as fh:
return json.load(fh)
except OSError as e:
logging.warning("Unable to read rates file %s: %s", rates_path, e)
rates_cfg = {}
logging.warning("Unable to read rates file %s: %s", path, e)
return {}
except json.JSONDecodeError as e:
logging.error("Failed to parse rates file %s: %s", rates_path, e)
logging.error("Failed to parse rates file %s: %s", path, e)
raise

def _validate_cluster_cores(self, resources: Dict[str, Any]) -> int:
cores = resources.get('cores')
if not isinstance(cores, (int, float)) or cores <= 0:
raise ValueError(f"Invalid cluster core count {cores}")
return int(cores)

def _build_account_details(
self,
usage: Dict[str, Dict[str, Any]],
rates_cfg: Dict[str, Any],
) -> Tuple[List[Dict[str, Any]], float, float, float]:
default_rate = rates_cfg.get('defaultRate', 0.01)
default_gpu_rate = rates_cfg.get('defaultGpuRate', 0.0)
overrides = rates_cfg.get('overrides', {})
historical = rates_cfg.get('historicalRates', {})
gpu_historical = rates_cfg.get('historicalGpuRates', {})
resources = self.cluster_resources()
cluster_cores = resources.get('cores')

details: List[Dict[str, Any]] = []
total_ch = total_gpu = total_cost = 0.0

for month, accounts in usage.items():
base_rate = historical.get(month, default_rate)
Expand All @@ -643,23 +635,19 @@ def export_summary(self, start_time, end_time):
if rate < 0:
raise ValueError(f"Invalid rate {rate} for account {account}")
if gpu_rate < 0:
raise ValueError(
f"Invalid GPU rate {gpu_rate} for account {account}"
)
raise ValueError(f"Invalid GPU rate {gpu_rate} for account {account}")
if not 0 <= discount <= 1:
raise ValueError(
f"Invalid discount {discount} for account {account}"
)
raise ValueError(f"Invalid discount {discount} for account {account}")

acct_cost = vals['core_hours'] * rate + vals.get('gpu_hours', 0.0) * gpu_rate
if 0 < discount < 1:
acct_cost *= 1 - discount
users = []
users: List[Dict[str, Any]] = []
for user, uvals in vals.get('users', {}).items():
u_cost = uvals['core_hours'] * rate
if 0 < discount < 1:
u_cost *= 1 - discount
jobs = []
jobs: List[Dict[str, Any]] = []
for job, jvals in uvals.get('jobs', {}).items():
j_cost = jvals['core_hours'] * rate
if 0 < discount < 1:
Expand Down Expand Up @@ -687,7 +675,7 @@ def export_summary(self, start_time, end_time):
'jobs': jobs,
}
)
summary['details'].append(
details.append(
{
'account': account,
'core_hours': round(vals['core_hours'], 2),
Expand All @@ -699,72 +687,120 @@ def export_summary(self, start_time, end_time):
total_ch += vals['core_hours']
total_gpu += vals.get('gpu_hours', 0.0)
total_cost += acct_cost
start_dt = (
_fromisoformat(start_time)
if isinstance(start_time, str)
else datetime.fromtimestamp(start_time)
)
end_dt = (
_fromisoformat(end_time)
if isinstance(end_time, str)
else datetime.fromtimestamp(end_time)
)
summary['summary'] = {
'period': f"{start_dt.strftime('%Y-%m-%d')} to {end_dt.strftime('%Y-%m-%d')}",
'total': round(total_cost, 2),
'core_hours': round(total_ch, 2),
'gpu_hours': round(total_gpu, 2),
'cluster': resources,
}
if cluster_cores:
start_date = start_dt.date()
end_date = end_dt.date()
current = date(start_date.year, start_date.month, 1)
end_marker = date(end_date.year, end_date.month, 1)
projected_revenue = 0.0
while current <= end_marker:
days_in_month = monthrange(current.year, current.month)[1]
month_start = date(current.year, current.month, 1)
month_end = date(current.year, current.month, days_in_month)
overlap_start = max(month_start, start_date)
overlap_end = min(month_end, end_date)
if overlap_start <= overlap_end:
days = (overlap_end - overlap_start).days + 1
rate = historical.get(current.strftime('%Y-%m'), default_rate)
projected_revenue += cluster_cores * 24 * days * rate
if current.month == 12:
current = date(current.year + 1, 1, 1)
else:
current = date(current.year, current.month + 1, 1)
summary['summary']['projected_revenue'] = round(projected_revenue, 2)
summary['daily'] = [
return details, total_ch, total_gpu, total_cost

def _build_time_series(
self, totals: Dict[str, Any]
) -> Tuple[List[Dict[str, float]], List[Dict[str, float]], List[Dict[str, float]]]:
daily = [
{
'date': d,
'core_hours': round(totals['daily'].get(d, 0.0), 2),
'gpu_hours': round(totals.get('daily_gpu', {}).get(d, 0.0), 2),
}
for d in sorted(set(totals['daily']) | set(totals.get('daily_gpu', {})))
]
summary['monthly'] = [
monthly = [
{
'month': m,
'core_hours': round(totals['monthly'].get(m, 0.0), 2),
'gpu_hours': round(totals.get('monthly_gpu', {}).get(m, 0.0), 2),
}
for m in sorted(set(totals['monthly']) | set(totals.get('monthly_gpu', {})))
]
summary['yearly'] = [
yearly = [
{
'year': y,
'core_hours': round(totals['yearly'].get(y, 0.0), 2),
'gpu_hours': round(totals.get('yearly_gpu', {}).get(y, 0.0), 2),
}
for y in sorted(set(totals['yearly']) | set(totals.get('yearly_gpu', {})))
]
summary['invoices'] = self.fetch_invoices(start_time, end_time)
summary['partitions'] = sorted(totals.get('partitions', []))
summary['accounts'] = sorted(totals.get('accounts', []))
summary['users'] = sorted(totals.get('users', []))
return daily, monthly, yearly

def _calculate_projected_revenue(
self,
start_dt: datetime,
end_dt: datetime,
cluster_cores: int,
rates_cfg: Dict[str, Any],
) -> float:
default_rate = rates_cfg.get('defaultRate', 0.01)
historical = rates_cfg.get('historicalRates', {})
start_date = start_dt.date()
end_date = end_dt.date()
current = date(start_date.year, start_date.month, 1)
end_marker = date(end_date.year, end_date.month, 1)
projected_revenue = 0.0
while current <= end_marker:
days_in_month = monthrange(current.year, current.month)[1]
month_start = date(current.year, current.month, 1)
month_end = date(current.year, current.month, days_in_month)
overlap_start = max(month_start, start_date)
overlap_end = min(month_end, end_date)
if overlap_start <= overlap_end:
days = (overlap_end - overlap_start).days + 1
rate = historical.get(current.strftime('%Y-%m'), default_rate)
projected_revenue += cluster_cores * 24 * days * rate
if current.month == 12:
current = date(current.year + 1, 1, 1)
else:
current = date(current.year, current.month + 1, 1)
return round(projected_revenue, 2)

def export_summary(
self,
start_time: Union[str, float],
end_time: Union[str, float],
rates_file: Optional[str] = None,
) -> Dict[str, Any]:
"""Export a summary of usage and costs.

Rates represent a fixed cost per core-hour (for example, dollars
per core-hour) and must be non-negative. ``discount`` values are
fractional percentages, where ``0.2`` means a 20% discount, and
they must fall between 0 and 1, inclusive. A :class:`ValueError`
is raised if these constraints are violated.
"""

usage, totals = self.aggregate_usage(start_time, end_time)
rates_cfg = self._load_rates(rates_file)
resources = self.cluster_resources()
cluster_cores = self._validate_cluster_cores(resources)
details, total_ch, total_gpu, total_cost = self._build_account_details(usage, rates_cfg)

start_dt = (
_fromisoformat(start_time)
if isinstance(start_time, str)
else datetime.fromtimestamp(start_time)
)
end_dt = (
_fromisoformat(end_time)
if isinstance(end_time, str)
else datetime.fromtimestamp(end_time)
)

daily, monthly, yearly = self._build_time_series(totals)
summary = {
'summary': {
'period': f"{start_dt.strftime('%Y-%m-%d')} to {end_dt.strftime('%Y-%m-%d')}",
'total': round(total_cost, 2),
'core_hours': round(total_ch, 2),
'gpu_hours': round(total_gpu, 2),
'cluster': resources,
},
'details': details,
'daily': daily,
'monthly': monthly,
'yearly': yearly,
'invoices': self.fetch_invoices(start_time, end_time),
'partitions': sorted(totals.get('partitions', [])),
'accounts': sorted(totals.get('accounts', [])),
'users': sorted(totals.get('users', [])),
}
summary['summary']['projected_revenue'] = self._calculate_projected_revenue(
start_dt, end_dt, cluster_cores, rates_cfg
)
return summary


Expand Down
67 changes: 45 additions & 22 deletions test/unit/billing_summary.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ def test_export_summary_aggregates_costs(self):
with mock.patch.object(
SlurmDB,
'aggregate_usage',
return_value=(usage, {
'daily': {},
'monthly': {},
'yearly': {},
'daily_gpu': {},
'monthly_gpu': {},
'yearly_gpu': {},
}),
return_value=(
usage,
{
'daily': {},
'monthly': {},
'yearly': {},
'daily_gpu': {},
'monthly_gpu': {},
'yearly_gpu': {},
},
),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=invoices), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
with mock.patch.object(SlurmDB, 'fetch_invoices', return_value=invoices):
db = SlurmDB()
summary = db.export_summary('2023-10-01', '2023-10-31')
db = SlurmDB()
summary = db.export_summary('2023-10-01', '2023-10-31')
self.assertEqual(summary['summary']['total'], 1.2)
self.assertEqual(summary['details'][0]['account'], 'acct')
self.assertEqual(summary['details'][0]['core_hours'], 10.0)
Expand Down Expand Up @@ -62,7 +66,9 @@ def test_export_summary_applies_overrides_and_discounts(self):
SlurmDB,
'aggregate_usage',
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]):
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
summary = db.export_summary('2024-02-01', '2024-02-29')
costs = {d['account']: d['cost'] for d in summary['details']}
Expand Down Expand Up @@ -101,7 +107,9 @@ def test_export_summary_preserves_job_details(self):
SlurmDB,
'aggregate_usage',
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]):
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
summary = db.export_summary('2024-03-01', '2024-03-31')
job = summary['details'][0]['users'][0]['jobs'][0]
Expand Down Expand Up @@ -133,6 +141,8 @@ def fake_open(path, *args, **kwargs):
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch(
'builtins.open', side_effect=fake_open
), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
with self.assertRaises(ValueError):
Expand All @@ -157,6 +167,8 @@ def fake_open(path, *args, **kwargs):
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch(
'builtins.open', side_effect=fake_open
), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
with self.assertRaises(ValueError):
Expand All @@ -181,6 +193,25 @@ def fake_open(path, *args, **kwargs):
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch(
'builtins.open', side_effect=fake_open
), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
with self.assertRaises(ValueError):
db.export_summary('2023-10-01', '2023-10-31')

def test_export_summary_invalid_cluster_cores(self):
usage = {
'2023-10': {
'acct': {'core_hours': 10.0, 'users': {}}
}
}
with mock.patch.object(
SlurmDB,
'aggregate_usage',
return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}),
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object(
SlurmDB, 'cluster_resources', return_value={'cores': 0}
):
db = SlurmDB()
with self.assertRaises(ValueError):
Expand All @@ -207,15 +238,7 @@ def fake_open(path, *args, **kwargs):
), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch(
'builtins.open', side_effect=fake_open
), mock.patch.object(
SlurmDB,
'_parse_slurm_conf',
return_value={
'nodes': 1,
'sockets': 1,
'cores': 100,
'threads': 1,
'gres': {},
},
SlurmDB, 'cluster_resources', return_value={'cores': 100}
):
db = SlurmDB()
summary = db.export_summary('2024-02-01', '2024-02-29')
Expand Down
Loading