From 58af30feb43c37b1e68fd354e91c9745f9d98def Mon Sep 17 00:00:00 2001 From: SameerAliKhan-git Date: Tue, 24 Mar 2026 18:09:30 +0530 Subject: [PATCH 1/2] feat: add /api/health and /api/health/solvers endpoints Adds two GET endpoints for cross-platform diagnostics: - /api/health returns platform info (OS, arch, Python version) and confirms DataStorage is writable - /api/health/solvers reports GLPK and CBC availability using the same three-tier resolution chain as Osemosys._resolve_solver_folder (env var > PATH > bundled) Includes pytest suite with 8 tests covering response shape, field presence, and mocked solver detection scenarios. Tracked under Track: Cross-Platform. --- API/Routes/System/HealthRoute.py | 96 +++++++++++++++++++++++++++ API/Routes/System/__init__.py | 0 API/app.py | 2 + tests/test_health.py | 109 +++++++++++++++++++++++++++++++ 4 files changed, 207 insertions(+) create mode 100644 API/Routes/System/HealthRoute.py create mode 100644 API/Routes/System/__init__.py create mode 100644 tests/test_health.py diff --git a/API/Routes/System/HealthRoute.py b/API/Routes/System/HealthRoute.py new file mode 100644 index 000000000..61f2760e8 --- /dev/null +++ b/API/Routes/System/HealthRoute.py @@ -0,0 +1,96 @@ +from flask import Blueprint, jsonify +import platform +import shutil +import os +from pathlib import Path +from Classes.Base import Config + +health_api = Blueprint('HealthRoute', __name__) + + +def _check_solver(binary_name, env_var): + """Check if a solver binary is reachable. + + Resolution order mirrors Osemosys._resolve_solver_folder: + 1. Environment variable + 2. System PATH (shutil.which) + 3. Bundled binary under SOLVERs_FOLDER + """ + + # 1 — env var + env_val = os.environ.get(env_var, "").strip().strip("\"'") + if env_val: + env_path = Path(env_val).expanduser() + if env_path.is_file(): + return {"found": True, "source": "env", "path": str(env_path)} + # directory — look inside + if env_path.is_dir(): + for name in _binary_names(binary_name): + candidate = env_path / name + if candidate.is_file(): + return {"found": True, "source": "env", "path": str(candidate)} + return {"found": False, "source": None, "path": None} + + # 2 — system PATH + for name in _binary_names(binary_name): + which_result = shutil.which(name) + if which_result: + return {"found": True, "source": "path", "path": which_result} + + # 3 — bundled + bundled_dir = Config.SOLVERs_FOLDER + for name in _binary_names(binary_name): + # check top-level and one level deep (GLPK/, COIN-OR/ subdirs) + for candidate in bundled_dir.rglob(name): + if candidate.is_file(): + return {"found": True, "source": "bundled", "path": str(candidate)} + + return {"found": False, "source": None, "path": None} + + +def _binary_names(binary_name): + """Return list of possible binary filenames for the current platform.""" + names = [binary_name] + if platform.system() == "Windows" and not binary_name.lower().endswith(".exe"): + names.insert(0, binary_name + ".exe") + return names + + +@health_api.route("/api/health", methods=['GET']) +def healthCheck(): + """Basic liveness check — confirms the Flask backend is running.""" + try: + # verify DataStorage is accessible + storage_ok = Config.DATA_STORAGE.is_dir() and os.access(Config.DATA_STORAGE, os.W_OK) + + response = { + "status": "ok", + "platform": platform.system(), + "architecture": platform.machine(), + "python": platform.python_version(), + "dataStorage": "writable" if storage_ok else "error" + } + return jsonify(response), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +@health_api.route("/api/health/solvers", methods=['GET']) +def solverStatus(): + """Report availability of GLPK and CBC solvers on this machine. + + Useful for cross-platform diagnostics — lets users (and the frontend) + know whether solvers are ready before attempting a model run. + """ + try: + glpk = _check_solver("glpsol", "SOLVER_GLPK_PATH") + cbc = _check_solver("cbc", "SOLVER_CBC_PATH") + + response = { + "glpk": glpk, + "cbc": cbc, + "anyAvailable": glpk["found"] or cbc["found"] + } + return jsonify(response), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 diff --git a/API/Routes/System/__init__.py b/API/Routes/System/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/API/app.py b/API/app.py index 76528a1a8..ff215c36f 100644 --- a/API/app.py +++ b/API/app.py @@ -16,6 +16,7 @@ from Routes.Case.SyncS3Route import syncs3_api from Routes.Case.ViewDataRoute import viewdata_api from Routes.DataFile.DataFileRoute import datafile_api +from Routes.System.HealthRoute import health_api #RADI # ------------------------- @@ -63,6 +64,7 @@ app.register_blueprint(viewdata_api) app.register_blueprint(datafile_api) app.register_blueprint(syncs3_api) +app.register_blueprint(health_api) CORS(app) diff --git a/tests/test_health.py b/tests/test_health.py new file mode 100644 index 000000000..25ed56ecc --- /dev/null +++ b/tests/test_health.py @@ -0,0 +1,109 @@ +"""Tests for the /api/health and /api/health/solvers endpoints.""" + +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Make sure the API package is importable when running from the repo root. +# The Flask app expects to run from inside API/, so we add that to sys.path +# the same way the start scripts do. +# --------------------------------------------------------------------------- +API_DIR = Path(__file__).resolve().parents[1] / "API" +if str(API_DIR) not in sys.path: + sys.path.insert(0, str(API_DIR)) + +from app import app # noqa: E402 + + +@pytest.fixture +def client(): + """Flask test client with testing mode enabled.""" + app.config["TESTING"] = True + with app.test_client() as c: + yield c + + +# ── /api/health ────────────────────────────────────────────────────────────── + +class TestHealthEndpoint: + + def test_returns_200(self, client): + resp = client.get("/api/health") + assert resp.status_code == 200 + + def test_response_contains_required_fields(self, client): + resp = client.get("/api/health") + data = json.loads(resp.data) + assert data["status"] == "ok" + assert "platform" in data + assert "python" in data + assert "architecture" in data + assert "dataStorage" in data + + def test_python_version_looks_valid(self, client): + resp = client.get("/api/health") + data = json.loads(resp.data) + # should be something like "3.11.9" + parts = data["python"].split(".") + assert len(parts) >= 2 + assert int(parts[0]) >= 3 + + +# ── /api/health/solvers ────────────────────────────────────────────────────── + +class TestSolverStatusEndpoint: + + def test_returns_200(self, client): + resp = client.get("/api/health/solvers") + assert resp.status_code == 200 + + def test_response_has_solver_keys(self, client): + resp = client.get("/api/health/solvers") + data = json.loads(resp.data) + assert "glpk" in data + assert "cbc" in data + assert "anyAvailable" in data + + def test_solver_entry_shape(self, client): + """Each solver entry should have found/source/path keys.""" + resp = client.get("/api/health/solvers") + data = json.loads(resp.data) + for solver_key in ("glpk", "cbc"): + entry = data[solver_key] + assert "found" in entry + assert "source" in entry + assert "path" in entry + + @patch("Routes.System.HealthRoute.shutil.which") + def test_glpk_found_on_path(self, mock_which, client): + """When glpsol is on PATH, glpk should report found=True.""" + def side_effect(name): + if name in ("glpsol", "glpsol.exe"): + return "/usr/bin/glpsol" + return None + mock_which.side_effect = side_effect + + resp = client.get("/api/health/solvers") + data = json.loads(resp.data) + assert data["glpk"]["found"] is True + assert data["glpk"]["source"] == "path" + + @patch("Routes.System.HealthRoute.shutil.which", return_value=None) + def test_no_solvers_reports_false(self, mock_which, client): + """When no solver is found anywhere, anyAvailable should be False.""" + # also need to make sure bundled dir scan finds nothing + with patch("Routes.System.HealthRoute.Config") as mock_cfg: + # point SOLVERs_FOLDER to a temp dir that has nothing + import tempfile + with tempfile.TemporaryDirectory() as tmpdir: + mock_cfg.SOLVERs_FOLDER = Path(tmpdir) + mock_cfg.DATA_STORAGE = Path(tmpdir) + resp = client.get("/api/health/solvers") + + data = json.loads(resp.data) + assert data["anyAvailable"] is False From 846eb89dff1156043a858911740363cfb5d5f00f Mon Sep 17 00:00:00 2001 From: SameerAliKhan-git Date: Tue, 24 Mar 2026 19:41:04 +0530 Subject: [PATCH 2/2] refactor: address Copilot review feedback on health check endpoints - implemented solver check caching with module variables - optimized bundled binary search by targeting specific subfolders - added binary filename validation for solver environment variables - improved security by avoiding leakage of raw exception strings to clients - updated health check to return 503 Service Unavailable on storage failure - improved test determinism with monkeypatch and cache resetting Ref: #367 --- API/Routes/System/HealthRoute.py | 70 ++++++++++++++++++++++++++------ tests/test_health.py | 36 ++++++++++++++-- 2 files changed, 90 insertions(+), 16 deletions(-) diff --git a/API/Routes/System/HealthRoute.py b/API/Routes/System/HealthRoute.py index 61f2760e8..02c0f85eb 100644 --- a/API/Routes/System/HealthRoute.py +++ b/API/Routes/System/HealthRoute.py @@ -2,11 +2,18 @@ import platform import shutil import os +import time +import logging from pathlib import Path from Classes.Base import Config health_api = Blueprint('HealthRoute', __name__) +# Basic module-level variables to cache solver check results +_SOLVER_CACHE_DATA = {} +_SOLVER_CACHE_TIME = 0.0 +CACHE_TTL = 300 # 5 minutes + def _check_solver(binary_name, env_var): """Check if a solver binary is reachable. @@ -16,35 +23,57 @@ def _check_solver(binary_name, env_var): 2. System PATH (shutil.which) 3. Bundled binary under SOLVERs_FOLDER """ + allowed_names = _binary_names(binary_name) # 1 — env var env_val = os.environ.get(env_var, "").strip().strip("\"'") if env_val: env_path = Path(env_val).expanduser() if env_path.is_file(): - return {"found": True, "source": "env", "path": str(env_path)} + # Validate that the file is actually one of the expected binary names + if env_path.name.lower() in [n.lower() for n in allowed_names]: + return {"found": True, "source": "env", "path": str(env_path)} # directory — look inside if env_path.is_dir(): - for name in _binary_names(binary_name): + for name in allowed_names: candidate = env_path / name if candidate.is_file(): return {"found": True, "source": "env", "path": str(candidate)} return {"found": False, "source": None, "path": None} # 2 — system PATH - for name in _binary_names(binary_name): + for name in allowed_names: which_result = shutil.which(name) if which_result: return {"found": True, "source": "path", "path": which_result} # 3 — bundled + # Optimize search by checking common subdirectories (GLPK, CBC) first bundled_dir = Config.SOLVERs_FOLDER - for name in _binary_names(binary_name): - # check top-level and one level deep (GLPK/, COIN-OR/ subdirs) - for candidate in bundled_dir.rglob(name): + # Subdirs to check to avoid full recursive scan of a large SOLVERs_FOLDER + search_dirs = [bundled_dir] + if binary_name.lower() == "glpsol": + search_dirs.append(bundled_dir / "GLPK") + elif binary_name.lower() == "cbc": + search_dirs.append(bundled_dir / "COIN-OR") + search_dirs.append(bundled_dir / "cbc") + + for s_dir in search_dirs: + if not s_dir.exists(): + continue + for name in allowed_names: + # Check shallowly first to avoid rglob complexity when possible + candidate = s_dir / name if candidate.is_file(): return {"found": True, "source": "bundled", "path": str(candidate)} + # Only rglob if we haven't found it (rglob is expensive) + # We limit to first match if rglob is used + for r_candidate in s_dir.rglob(name): + if r_candidate.is_file(): + return {"found": True, "source": "bundled", "path": str(r_candidate)} + break + return {"found": False, "source": None, "path": None} @@ -58,31 +87,41 @@ def _binary_names(binary_name): @health_api.route("/api/health", methods=['GET']) def healthCheck(): - """Basic liveness check — confirms the Flask backend is running.""" + """Basic liveness and readiness check — confirms the Flask backend is healthy.""" try: - # verify DataStorage is accessible + # verify DataStorage is accessible and writable storage_ok = Config.DATA_STORAGE.is_dir() and os.access(Config.DATA_STORAGE, os.W_OK) response = { - "status": "ok", + "status": "ok" if storage_ok else "error", "platform": platform.system(), "architecture": platform.machine(), "python": platform.python_version(), "dataStorage": "writable" if storage_ok else "error" } + + if not storage_ok: + return jsonify(response), 503 + return jsonify(response), 200 except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 + logging.error(f"Health check failed: {e}") + return jsonify({"status": "error", "message": "Failed to perform system health check"}), 500 @health_api.route("/api/health/solvers", methods=['GET']) def solverStatus(): """Report availability of GLPK and CBC solvers on this machine. - Useful for cross-platform diagnostics — lets users (and the frontend) - know whether solvers are ready before attempting a model run. + Uses simple module variables to avoid repeated disk scans during high-frequency polling. """ + global _SOLVER_CACHE_DATA, _SOLVER_CACHE_TIME try: + # Check cache + current_time = time.time() + if _SOLVER_CACHE_DATA and (current_time - _SOLVER_CACHE_TIME < CACHE_TTL): + return jsonify(_SOLVER_CACHE_DATA), 200 + glpk = _check_solver("glpsol", "SOLVER_GLPK_PATH") cbc = _check_solver("cbc", "SOLVER_CBC_PATH") @@ -91,6 +130,11 @@ def solverStatus(): "cbc": cbc, "anyAvailable": glpk["found"] or cbc["found"] } + + _SOLVER_CACHE_DATA = response + _SOLVER_CACHE_TIME = current_time + return jsonify(response), 200 except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 + logging.error(f"Solver health check failed: {e}") + return jsonify({"status": "error", "message": "Failed to perform solver health check"}), 500 diff --git a/tests/test_health.py b/tests/test_health.py index 25ed56ecc..0988f7091 100644 --- a/tests/test_health.py +++ b/tests/test_health.py @@ -1,10 +1,9 @@ """Tests for the /api/health and /api/health/solvers endpoints.""" import json -import os import sys from pathlib import Path -from unittest.mock import patch, MagicMock +from unittest.mock import patch import pytest @@ -18,6 +17,7 @@ sys.path.insert(0, str(API_DIR)) from app import app # noqa: E402 +from Routes.System import HealthRoute # noqa: E402 @pytest.fixture @@ -28,14 +28,44 @@ def client(): yield c +@pytest.fixture(autouse=True) +def reset_health_cache(monkeypatch): + """Ensure every test runs on a clean environment/cache. + + Copilot Review point: Ensure solver env vars are cleared so tests + deterministically use the mocked shutil.which or bundled paths. + """ + monkeypatch.setattr(HealthRoute, "_SOLVER_CACHE_DATA", {}) + monkeypatch.setattr(HealthRoute, "_SOLVER_CACHE_TIME", 0.0) + # Clear environment variables so they don't interfere with PATH/bundled tests + for var in ["SOLVER_GLPK_PATH", "SOLVER_CBC_PATH"]: + monkeypatch.delenv(var, raising=False) + + # ── /api/health ────────────────────────────────────────────────────────────── class TestHealthEndpoint: - def test_returns_200(self, client): + def test_returns_200_when_healthy(self, client): resp = client.get("/api/health") assert resp.status_code == 200 + def test_returns_503_when_storage_unwritable(self, client): + """Copilot Review point: Return non-200 when storage is degraded.""" + with patch("Routes.System.HealthRoute.Config") as mock_cfg: + import tempfile + with tempfile.TemporaryDirectory() as tmpdir: + # Mock a directory that exists but we'll make reports as unwritable + # (Easiest way in this context is to mock os.access or just the dir check result) + mock_cfg.DATA_STORAGE = Path(tmpdir) + with patch("Routes.System.HealthRoute.os.access", return_value=False): + resp = client.get("/api/health") + + assert resp.status_code == 503 + data = json.loads(resp.data) + assert data["status"] == "error" + assert data["dataStorage"] == "error" + def test_response_contains_required_fields(self, client): resp = client.get("/api/health") data = json.loads(resp.data)