From f7a3d9683c63ef3657f08b7a0989bc980607afd1 Mon Sep 17 00:00:00 2001 From: Robert Romero Date: Tue, 9 Sep 2025 06:31:44 -0700 Subject: [PATCH] refactor export_summary and add validation --- src/slurmdb.py | 206 ++++++++++++++++++------------ test/unit/billing_summary.test.py | 67 ++++++---- 2 files changed, 166 insertions(+), 107 deletions(-) diff --git a/src/slurmdb.py b/src/slurmdb.py index 5c4c74e..33554dc 100644 --- a/src/slurmdb.py +++ b/src/slurmdb.py @@ -7,6 +7,7 @@ from datetime import date, datetime, timedelta from calendar import monthrange from itertools import product +from typing import Any, Dict, List, Optional, Tuple, Union try: @@ -590,46 +591,37 @@ def fetch_invoices(self, start_date=None, end_date=None): for r in rows ] - def export_summary(self, start_time, end_time): - """Export a summary of usage and costs. - - Rates represent a fixed cost per core-hour (for example, dollars - per core-hour) and must be non-negative. ``discount`` values are - fractional percentages, where ``0.2`` means a 20% discount, and - they must fall between 0 and 1, inclusive. A :class:`ValueError` - is raised if these constraints are violated. - """ - - usage, totals = self.aggregate_usage(start_time, end_time) - summary = { - 'summary': {}, - 'details': [], - 'daily': [], - 'monthly': [], - 'yearly': [], - 'invoices': [], - } - total_ch = 0.0 - total_gpu = 0.0 - total_cost = 0.0 - - rates_path = os.path.join(os.path.dirname(__file__), 'rates.json') + def _load_rates(self, rates_file: Optional[str]) -> Dict[str, Any]: + path = rates_file or os.path.join(os.path.dirname(__file__), 'rates.json') try: - with open(rates_path) as fh: - rates_cfg = json.load(fh) + with open(path) as fh: + return json.load(fh) except OSError as e: - logging.warning("Unable to read rates file %s: %s", rates_path, e) - rates_cfg = {} + logging.warning("Unable to read rates file %s: %s", path, e) + return {} except json.JSONDecodeError as e: - logging.error("Failed to parse rates file %s: %s", rates_path, e) + logging.error("Failed to parse rates file %s: %s", path, e) raise + + def _validate_cluster_cores(self, resources: Dict[str, Any]) -> int: + cores = resources.get('cores') + if not isinstance(cores, (int, float)) or cores <= 0: + raise ValueError(f"Invalid cluster core count {cores}") + return int(cores) + + def _build_account_details( + self, + usage: Dict[str, Dict[str, Any]], + rates_cfg: Dict[str, Any], + ) -> Tuple[List[Dict[str, Any]], float, float, float]: default_rate = rates_cfg.get('defaultRate', 0.01) default_gpu_rate = rates_cfg.get('defaultGpuRate', 0.0) overrides = rates_cfg.get('overrides', {}) historical = rates_cfg.get('historicalRates', {}) gpu_historical = rates_cfg.get('historicalGpuRates', {}) - resources = self.cluster_resources() - cluster_cores = resources.get('cores') + + details: List[Dict[str, Any]] = [] + total_ch = total_gpu = total_cost = 0.0 for month, accounts in usage.items(): base_rate = historical.get(month, default_rate) @@ -643,23 +635,19 @@ def export_summary(self, start_time, end_time): if rate < 0: raise ValueError(f"Invalid rate {rate} for account {account}") if gpu_rate < 0: - raise ValueError( - f"Invalid GPU rate {gpu_rate} for account {account}" - ) + raise ValueError(f"Invalid GPU rate {gpu_rate} for account {account}") if not 0 <= discount <= 1: - raise ValueError( - f"Invalid discount {discount} for account {account}" - ) + raise ValueError(f"Invalid discount {discount} for account {account}") acct_cost = vals['core_hours'] * rate + vals.get('gpu_hours', 0.0) * gpu_rate if 0 < discount < 1: acct_cost *= 1 - discount - users = [] + users: List[Dict[str, Any]] = [] for user, uvals in vals.get('users', {}).items(): u_cost = uvals['core_hours'] * rate if 0 < discount < 1: u_cost *= 1 - discount - jobs = [] + jobs: List[Dict[str, Any]] = [] for job, jvals in uvals.get('jobs', {}).items(): j_cost = jvals['core_hours'] * rate if 0 < discount < 1: @@ -687,7 +675,7 @@ def export_summary(self, start_time, end_time): 'jobs': jobs, } ) - summary['details'].append( + details.append( { 'account': account, 'core_hours': round(vals['core_hours'], 2), @@ -699,45 +687,12 @@ def export_summary(self, start_time, end_time): total_ch += vals['core_hours'] total_gpu += vals.get('gpu_hours', 0.0) total_cost += acct_cost - start_dt = ( - _fromisoformat(start_time) - if isinstance(start_time, str) - else datetime.fromtimestamp(start_time) - ) - end_dt = ( - _fromisoformat(end_time) - if isinstance(end_time, str) - else datetime.fromtimestamp(end_time) - ) - summary['summary'] = { - 'period': f"{start_dt.strftime('%Y-%m-%d')} to {end_dt.strftime('%Y-%m-%d')}", - 'total': round(total_cost, 2), - 'core_hours': round(total_ch, 2), - 'gpu_hours': round(total_gpu, 2), - 'cluster': resources, - } - if cluster_cores: - start_date = start_dt.date() - end_date = end_dt.date() - current = date(start_date.year, start_date.month, 1) - end_marker = date(end_date.year, end_date.month, 1) - projected_revenue = 0.0 - while current <= end_marker: - days_in_month = monthrange(current.year, current.month)[1] - month_start = date(current.year, current.month, 1) - month_end = date(current.year, current.month, days_in_month) - overlap_start = max(month_start, start_date) - overlap_end = min(month_end, end_date) - if overlap_start <= overlap_end: - days = (overlap_end - overlap_start).days + 1 - rate = historical.get(current.strftime('%Y-%m'), default_rate) - projected_revenue += cluster_cores * 24 * days * rate - if current.month == 12: - current = date(current.year + 1, 1, 1) - else: - current = date(current.year, current.month + 1, 1) - summary['summary']['projected_revenue'] = round(projected_revenue, 2) - summary['daily'] = [ + return details, total_ch, total_gpu, total_cost + + def _build_time_series( + self, totals: Dict[str, Any] + ) -> Tuple[List[Dict[str, float]], List[Dict[str, float]], List[Dict[str, float]]]: + daily = [ { 'date': d, 'core_hours': round(totals['daily'].get(d, 0.0), 2), @@ -745,7 +700,7 @@ def export_summary(self, start_time, end_time): } for d in sorted(set(totals['daily']) | set(totals.get('daily_gpu', {}))) ] - summary['monthly'] = [ + monthly = [ { 'month': m, 'core_hours': round(totals['monthly'].get(m, 0.0), 2), @@ -753,7 +708,7 @@ def export_summary(self, start_time, end_time): } for m in sorted(set(totals['monthly']) | set(totals.get('monthly_gpu', {}))) ] - summary['yearly'] = [ + yearly = [ { 'year': y, 'core_hours': round(totals['yearly'].get(y, 0.0), 2), @@ -761,10 +716,91 @@ def export_summary(self, start_time, end_time): } for y in sorted(set(totals['yearly']) | set(totals.get('yearly_gpu', {}))) ] - summary['invoices'] = self.fetch_invoices(start_time, end_time) - summary['partitions'] = sorted(totals.get('partitions', [])) - summary['accounts'] = sorted(totals.get('accounts', [])) - summary['users'] = sorted(totals.get('users', [])) + return daily, monthly, yearly + + def _calculate_projected_revenue( + self, + start_dt: datetime, + end_dt: datetime, + cluster_cores: int, + rates_cfg: Dict[str, Any], + ) -> float: + default_rate = rates_cfg.get('defaultRate', 0.01) + historical = rates_cfg.get('historicalRates', {}) + start_date = start_dt.date() + end_date = end_dt.date() + current = date(start_date.year, start_date.month, 1) + end_marker = date(end_date.year, end_date.month, 1) + projected_revenue = 0.0 + while current <= end_marker: + days_in_month = monthrange(current.year, current.month)[1] + month_start = date(current.year, current.month, 1) + month_end = date(current.year, current.month, days_in_month) + overlap_start = max(month_start, start_date) + overlap_end = min(month_end, end_date) + if overlap_start <= overlap_end: + days = (overlap_end - overlap_start).days + 1 + rate = historical.get(current.strftime('%Y-%m'), default_rate) + projected_revenue += cluster_cores * 24 * days * rate + if current.month == 12: + current = date(current.year + 1, 1, 1) + else: + current = date(current.year, current.month + 1, 1) + return round(projected_revenue, 2) + + def export_summary( + self, + start_time: Union[str, float], + end_time: Union[str, float], + rates_file: Optional[str] = None, + ) -> Dict[str, Any]: + """Export a summary of usage and costs. + + Rates represent a fixed cost per core-hour (for example, dollars + per core-hour) and must be non-negative. ``discount`` values are + fractional percentages, where ``0.2`` means a 20% discount, and + they must fall between 0 and 1, inclusive. A :class:`ValueError` + is raised if these constraints are violated. + """ + + usage, totals = self.aggregate_usage(start_time, end_time) + rates_cfg = self._load_rates(rates_file) + resources = self.cluster_resources() + cluster_cores = self._validate_cluster_cores(resources) + details, total_ch, total_gpu, total_cost = self._build_account_details(usage, rates_cfg) + + start_dt = ( + _fromisoformat(start_time) + if isinstance(start_time, str) + else datetime.fromtimestamp(start_time) + ) + end_dt = ( + _fromisoformat(end_time) + if isinstance(end_time, str) + else datetime.fromtimestamp(end_time) + ) + + daily, monthly, yearly = self._build_time_series(totals) + summary = { + 'summary': { + 'period': f"{start_dt.strftime('%Y-%m-%d')} to {end_dt.strftime('%Y-%m-%d')}", + 'total': round(total_cost, 2), + 'core_hours': round(total_ch, 2), + 'gpu_hours': round(total_gpu, 2), + 'cluster': resources, + }, + 'details': details, + 'daily': daily, + 'monthly': monthly, + 'yearly': yearly, + 'invoices': self.fetch_invoices(start_time, end_time), + 'partitions': sorted(totals.get('partitions', [])), + 'accounts': sorted(totals.get('accounts', [])), + 'users': sorted(totals.get('users', [])), + } + summary['summary']['projected_revenue'] = self._calculate_projected_revenue( + start_dt, end_dt, cluster_cores, rates_cfg + ) return summary diff --git a/test/unit/billing_summary.test.py b/test/unit/billing_summary.test.py index 5d5a934..260e564 100644 --- a/test/unit/billing_summary.test.py +++ b/test/unit/billing_summary.test.py @@ -21,18 +21,22 @@ def test_export_summary_aggregates_costs(self): with mock.patch.object( SlurmDB, 'aggregate_usage', - return_value=(usage, { - 'daily': {}, - 'monthly': {}, - 'yearly': {}, - 'daily_gpu': {}, - 'monthly_gpu': {}, - 'yearly_gpu': {}, - }), + return_value=( + usage, + { + 'daily': {}, + 'monthly': {}, + 'yearly': {}, + 'daily_gpu': {}, + 'monthly_gpu': {}, + 'yearly_gpu': {}, + }, + ), + ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=invoices), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} ): - with mock.patch.object(SlurmDB, 'fetch_invoices', return_value=invoices): - db = SlurmDB() - summary = db.export_summary('2023-10-01', '2023-10-31') + db = SlurmDB() + summary = db.export_summary('2023-10-01', '2023-10-31') self.assertEqual(summary['summary']['total'], 1.2) self.assertEqual(summary['details'][0]['account'], 'acct') self.assertEqual(summary['details'][0]['core_hours'], 10.0) @@ -62,7 +66,9 @@ def test_export_summary_applies_overrides_and_discounts(self): SlurmDB, 'aggregate_usage', return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), - ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]): + ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} + ): db = SlurmDB() summary = db.export_summary('2024-02-01', '2024-02-29') costs = {d['account']: d['cost'] for d in summary['details']} @@ -101,7 +107,9 @@ def test_export_summary_preserves_job_details(self): SlurmDB, 'aggregate_usage', return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), - ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]): + ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} + ): db = SlurmDB() summary = db.export_summary('2024-03-01', '2024-03-31') job = summary['details'][0]['users'][0]['jobs'][0] @@ -133,6 +141,8 @@ def fake_open(path, *args, **kwargs): return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch( 'builtins.open', side_effect=fake_open + ), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} ): db = SlurmDB() with self.assertRaises(ValueError): @@ -157,6 +167,8 @@ def fake_open(path, *args, **kwargs): return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch( 'builtins.open', side_effect=fake_open + ), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} ): db = SlurmDB() with self.assertRaises(ValueError): @@ -181,6 +193,25 @@ def fake_open(path, *args, **kwargs): return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch( 'builtins.open', side_effect=fake_open + ), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 100} + ): + db = SlurmDB() + with self.assertRaises(ValueError): + db.export_summary('2023-10-01', '2023-10-31') + + def test_export_summary_invalid_cluster_cores(self): + usage = { + '2023-10': { + 'acct': {'core_hours': 10.0, 'users': {}} + } + } + with mock.patch.object( + SlurmDB, + 'aggregate_usage', + return_value=(usage, {'daily': {}, 'monthly': {}, 'yearly': {}}), + ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch.object( + SlurmDB, 'cluster_resources', return_value={'cores': 0} ): db = SlurmDB() with self.assertRaises(ValueError): @@ -207,15 +238,7 @@ def fake_open(path, *args, **kwargs): ), mock.patch.object(SlurmDB, 'fetch_invoices', return_value=[]), mock.patch( 'builtins.open', side_effect=fake_open ), mock.patch.object( - SlurmDB, - '_parse_slurm_conf', - return_value={ - 'nodes': 1, - 'sockets': 1, - 'cores': 100, - 'threads': 1, - 'gres': {}, - }, + SlurmDB, 'cluster_resources', return_value={'cores': 100} ): db = SlurmDB() summary = db.export_summary('2024-02-01', '2024-02-29')