From 70b362862ac6a75e9805ec4e446818a5b852ab01 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 00:19:31 +0000 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20control=20li?= =?UTF-8?q?st=20and=20assessment=20lookups?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add composite index `idx_control_date` to `AssessmentRecord` table. - Implement optimized `get_latest_assessments` helper in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from reports and assessment routers. - Add `benchmark_controls.py` for performance verification. This optimization reduces database roundtrips and memory overhead, providing a scalable foundation for larger compliance frameworks. Co-authored-by: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> --- .jules/bolt.md | 7 +++ backend/db/database.py | 37 +++++++++++- backend/routers/assessment.py | 22 +------- backend/routers/controls.py | 67 ++++++++++++---------- backend/routers/reports.py | 35 +++--------- benchmark_controls.py | 103 ++++++++++++++++++++++++++++++++++ 6 files changed, 190 insertions(+), 81 deletions(-) create mode 100644 .jules/bolt.md create mode 100644 benchmark_controls.py diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..ed6cc2c --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,7 @@ +## 2025-03-04 - [Database-Level Filtering for Implementation Status] +**Learning:** In this architecture, 'not_started' status is represented by the absence of an `AssessmentRecord`. Python-side filtering for this status after fetching all controls is an anti-pattern that leads to O(N) database queries or massive over-fetching. +**Action:** Use a `LEFT OUTER JOIN` with a subquery for the latest assessment and explicitly check `AssessmentRecord.id.is_(None)` in the SQL `WHERE` clause to filter for 'not_started' controls at the database level. + +## 2025-03-04 - [Optimizing Latest-State Queries] +**Learning:** Fetching the "latest state" for 110+ controls (Standard CMMC L2) is the most frequent and expensive operation. A composite index on `(control_id, assessment_date)` is critical for the performance of the subquery join used to resolve the latest assessment. +**Action:** Always ensure `AssessmentRecord` has a composite index on `control_id` and `assessment_date` when implementing framework-wide status lookups. diff --git a/backend/db/database.py b/backend/db/database.py index 8d26dc3..7e53b1e 100644 --- a/backend/db/database.py +++ b/backend/db/database.py @@ -6,7 +6,7 @@ import json from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, DeclarativeBase -from sqlalchemy import Column, String, Integer, Float, DateTime, Text, JSON, select +from sqlalchemy import Column, String, Integer, Float, DateTime, Text, JSON, select, Index, func from datetime import datetime, UTC DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./cmmc.db") @@ -72,6 +72,10 @@ class AssessmentRecord(Base): next_review = Column(DateTime) poam_required = Column(String, default="false") + __table_args__ = ( + Index('idx_control_date', 'control_id', 'assessment_date'), + ) + class AgentRunRecord(Base): __tablename__ = "agent_runs" @@ -126,3 +130,34 @@ async def get_db(): raise finally: await session.close() + + +async def get_latest_assessments(db: AsyncSession, control_ids: list[str] = None): + """ + Get the most recent AssessmentRecord for each control_id. + Optionally filter by a list of control_ids. + """ + sub_q = ( + select( + AssessmentRecord.control_id, + func.max(AssessmentRecord.assessment_date).label("max_date") + ) + .group_by(AssessmentRecord.control_id) + ) + + if control_ids: + sub_q = sub_q.where(AssessmentRecord.control_id.in_(control_ids)) + + sub_q = sub_q.subquery() + + query = ( + select(AssessmentRecord) + .join( + sub_q, + (AssessmentRecord.control_id == sub_q.c.control_id) & + (AssessmentRecord.assessment_date == sub_q.c.max_date) + ) + ) + + result = await db.execute(query) + return {a.control_id: a for a in result.scalars().all()} diff --git a/backend/routers/assessment.py b/backend/routers/assessment.py index 89709e0..a37f2e8 100644 --- a/backend/routers/assessment.py +++ b/backend/routers/assessment.py @@ -11,7 +11,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func -from backend.db.database import get_db, ControlRecord, AssessmentRecord, AgentRunRecord +from backend.db.database import get_db, ControlRecord, AssessmentRecord, AgentRunRecord, get_latest_assessments router = APIRouter() @@ -55,26 +55,6 @@ class SPRSResult(BaseModel): certification_level: str assessment_date: str -async def get_latest_assessments(db: AsyncSession): - sub_q = ( - select( - AssessmentRecord.control_id, - func.max(AssessmentRecord.assessment_date).label("max_date") - ) - .group_by(AssessmentRecord.control_id) - .subquery() - ) - query = ( - select(AssessmentRecord) - .join( - sub_q, - (AssessmentRecord.control_id == sub_q.c.control_id) & - (AssessmentRecord.assessment_date == sub_q.c.max_date) - ) - ) - result = await db.execute(query) - return {a.control_id: a for a in result.scalars().all()} - @router.get( "/dashboard", response_model=DashboardSummary, diff --git a/backend/routers/controls.py b/backend/routers/controls.py index 6ecfa84..76b9ad0 100644 --- a/backend/routers/controls.py +++ b/backend/routers/controls.py @@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func -from backend.db.database import get_db, ControlRecord, AssessmentRecord +from backend.db.database import get_db, ControlRecord, AssessmentRecord, get_latest_assessments from backend.models.control import ( Control, ControlResponse, ControlListResponse, ControlUpdate, CMMCLevel, ControlDomain, ImplementationStatus @@ -27,14 +27,10 @@ async def list_controls( status: Optional[ImplementationStatus] = Query(None, description="Filter by implementation status"), db: AsyncSession = Depends(get_db) ): - # Base query for controls - query = select(ControlRecord) - if level: - query = query.where(ControlRecord.level == level.value) - if domain: - query = query.where(ControlRecord.domain == domain.value) - - # Efficiently get latest assessments for all relevant controls + """ + Optimized list_controls using single query JOIN for performance. + """ + # 1. Get latest assessment subquery sub_q = ( select( AssessmentRecord.control_id, @@ -44,30 +40,37 @@ async def list_controls( .subquery() ) - assessments_q = ( - select(AssessmentRecord) - .join( + # 2. Main query joining Controls with latest Assessments + query = ( + select(ControlRecord, AssessmentRecord) + .outerjoin( sub_q, + ControlRecord.id == sub_q.c.control_id + ) + .outerjoin( + AssessmentRecord, (AssessmentRecord.control_id == sub_q.c.control_id) & (AssessmentRecord.assessment_date == sub_q.c.max_date) ) ) - ctrl_result = await db.execute(query) - controls_data = ctrl_result.scalars().all() - - ass_result = await db.execute(assessments_q) - assessments_map = {a.control_id: a for a in ass_result.scalars().all()} - - responses = [] - for c in controls_data: - assessment = assessments_map.get(c.id) - impl_status = assessment.status if assessment else "not_started" + # 3. Apply Filters at DB level + if level: + query = query.where(ControlRecord.level == level.value) + if domain: + query = query.where(ControlRecord.domain == domain.value) + if status: + if status.value == "not_started": + # "not_started" means no assessment record exists + query = query.where(AssessmentRecord.id.is_(None)) + else: + query = query.where(AssessmentRecord.status == status.value) - if status and impl_status != status.value: - continue + result = await db.execute(query) + rows = result.all() - responses.append(ControlResponse( + responses = [ + ControlResponse( control=Control( id=c.id, title=c.title, @@ -77,12 +80,14 @@ async def list_controls( nist_mapping=c.nist_mapping, weight=c.score_value ), - implementation_status=impl_status, - evidence_count=len(assessment.evidence_ids) if assessment and isinstance(assessment.evidence_ids, list) else 0, - notes=assessment.notes if assessment else None, - confidence=assessment.confidence if assessment else 0.0, - poam_required=(assessment.poam_required == "true") if assessment else False - )) + implementation_status=a.status if a else "not_started", + evidence_count=len(a.evidence_ids) if a and isinstance(a.evidence_ids, list) else 0, + notes=a.notes if a else None, + confidence=a.confidence if a else 0.0, + poam_required=(a.poam_required == "true") if a else False + ) + for c, a in rows + ] return ControlListResponse(controls=responses, total=len(responses), level_filter=level, domain_filter=domain) diff --git a/backend/routers/reports.py b/backend/routers/reports.py index afe7c25..6549f98 100644 --- a/backend/routers/reports.py +++ b/backend/routers/reports.py @@ -15,34 +15,10 @@ import io import json -from backend.db.database import get_db, AssessmentRecord, ControlRecord, EvidenceRecord +from backend.db.database import get_db, AssessmentRecord, ControlRecord, EvidenceRecord, get_latest_assessments router = APIRouter() -async def get_latest_assessments(db: AsyncSession): - # Subquery for latest assessment date per control_id - subquery = ( - select( - AssessmentRecord.control_id, - func.max(AssessmentRecord.assessment_date).label("max_date") - ) - .group_by(AssessmentRecord.control_id) - .subquery() - ) - - # Join with the original table to get full records - query = ( - select(AssessmentRecord) - .join( - subquery, - (AssessmentRecord.control_id == subquery.c.control_id) & - (AssessmentRecord.assessment_date == subquery.c.max_date) - ) - ) - - result = await db.execute(query) - return result.scalars().all() - @router.get("/ssp", summary="Generate System Security Plan (SSP) in Markdown") async def generate_ssp( system_name: str = "AGI Corp CMMC System", @@ -54,7 +30,8 @@ async def generate_ssp( Includes: system overview, control family summaries, implementation status. """ # Fetch latest assessments - assessments = await get_latest_assessments(db) + assessments_map = await get_latest_assessments(db) + assessments = list(assessments_map.values()) controls_result = await db.execute(select(ControlRecord)) controls = {c.id: c for c in controls_result.scalars().all()} @@ -146,7 +123,8 @@ async def generate_poam( Generate a Plan of Action & Milestones (POA&M) as CSV. Includes all partial and not_implemented controls. """ - assessments = await get_latest_assessments(db) + assessments_map = await get_latest_assessments(db) + assessments = list(assessments_map.values()) controls_result = await db.execute(select(ControlRecord)) controls = {c.id: c for c in controls_result.scalars().all()} @@ -189,7 +167,8 @@ async def get_dashboard( db: AsyncSession = Depends(get_db), ): """Return compliance posture summary for dashboard rendering.""" - assessments = await get_latest_assessments(db) + assessments_map = await get_latest_assessments(db) + assessments = list(assessments_map.values()) status_counts = {"implemented": 0, "partial": 0, "planned": 0, "not_implemented": 0, "na": 0} diff --git a/benchmark_controls.py b/benchmark_controls.py new file mode 100644 index 0000000..907ef8b --- /dev/null +++ b/benchmark_controls.py @@ -0,0 +1,103 @@ + +import asyncio +import time +import os +import uuid +import statistics +from datetime import datetime, timedelta, UTC +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +from sqlalchemy.orm import sessionmaker +from httpx import AsyncClient, ASGITransport + +# Set up environment for benchmark +os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./benchmark.db" + +from backend.main import app +from backend.db.database import Base, ControlRecord, AssessmentRecord, init_db + +async def seed_benchmark_data(): + engine = create_async_engine("sqlite+aiosqlite:///./benchmark.db") + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await conn.run_sync(Base.metadata.create_all) + + AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with AsyncSessionLocal() as session: + # 1. Seed 110 controls (Standard CMMC Level 2) + controls = [] + for i in range(110): + cid = f"AC.2.{i+1:03d}" + ctrl = ControlRecord( + id=cid, + domain="AC", + level="Level 2", + title=f"Benchmark Control {i+1}", + description=f"Description for control {i+1}", + score_value=1 if i % 5 != 0 else 5 + ) + controls.append(ctrl) + session.add(ctrl) + + # 2. Seed 10 assessments per control (1100 total) + # We want to simulate history, so the latest one is what matters + for ctrl in controls: + for j in range(10): + assessment = AssessmentRecord( + id=str(uuid.uuid4()), + control_id=ctrl.id, + status="implemented" if j == 9 else "not_implemented", + assessment_date=datetime.now(UTC) - timedelta(days=10-j), + confidence=0.9, + notes=f"Assessment history {j}", + evidence_ids=["ev-1", "ev-2"] + ) + session.add(assessment) + + await session.commit() + await engine.dispose() + +async def run_benchmark(): + print("Seeding benchmark data...") + await seed_benchmark_data() + + print("Running benchmark on /api/controls/ ...") + + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + # 1. No filter + print("\nTest 1: No filter") + latencies = [] + await ac.get("/api/controls/") # Warmup + for _ in range(50): + start_time = time.perf_counter() + response = await ac.get("/api/controls/") + end_time = time.perf_counter() + latencies.append((end_time - start_time) * 1000) + print(f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms") + + # 2. Status filter (should be faster with SQL filtering) + print("\nTest 2: Status filter (implemented)") + latencies = [] + await ac.get("/api/controls/?status=implemented") # Warmup + for _ in range(50): + start_time = time.perf_counter() + response = await ac.get("/api/controls/?status=implemented") + end_time = time.perf_counter() + latencies.append((end_time - start_time) * 1000) + print(f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms") + + avg_latency = statistics.mean(latencies) + median_latency = statistics.median(latencies) + p95_latency = statistics.quantiles(latencies, n=20)[18] # 19th 5-percentile is p95 + + print(f"\nResults (50 requests):") + print(f"Average Latency: {avg_latency:.2f} ms") + print(f"Median Latency: {median_latency:.2f} ms") + print(f"P95 Latency: {p95_latency:.2f} ms") + + # Clean up + if os.path.exists("./benchmark.db"): + os.remove("./benchmark.db") + +if __name__ == "__main__": + asyncio.run(run_benchmark()) From 981afa489a07fdfc05d62e001ff881fdf817815a Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 00:21:48 +0000 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20control=20li?= =?UTF-8?q?st=20and=20assessment=20lookups?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add composite index `idx_control_date` to `AssessmentRecord` table. - Implement optimized `get_latest_assessments` helper in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from reports and assessment routers. - Add `benchmark_controls.py` for performance verification. - Apply linting and formatting (black, isort). This optimization reduces database roundtrips and memory overhead, providing a scalable foundation for larger compliance frameworks. Note: CI failure was due to account billing issue, unrelated to code changes. Co-authored-by: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> --- agents/devsecops_agent/agent.py | 100 ++++++++++++++++----- agents/icam_agent/agent.py | 153 ++++++++++++++++++++++---------- agents/mistral_agent/agent.py | 79 ++++++++++++----- agents/orchestrator/agent.py | 119 ++++++++++++++++--------- backend/db/database.py | 57 ++++++------ backend/main.py | 29 +++--- backend/models/control.py | 34 +++++-- backend/models/evidence.py | 9 +- backend/routers/assessment.py | 89 ++++++++++++++----- backend/routers/controls.py | 99 +++++++++++++-------- backend/routers/evidence.py | 32 ++++--- backend/routers/reports.py | 113 ++++++++++++++++------- benchmark_controls.py | 45 ++++++---- tests/test_agents.py | 46 +++++++--- tests/test_backend.py | 40 ++++++--- 15 files changed, 707 insertions(+), 337 deletions(-) diff --git a/agents/devsecops_agent/agent.py b/agents/devsecops_agent/agent.py index 16beb58..5965179 100644 --- a/agents/devsecops_agent/agent.py +++ b/agents/devsecops_agent/agent.py @@ -5,14 +5,17 @@ Aligns with CMMC CM/SI domains, Fulcrum LOE 2, Cloud Security Playbook Play 19-21. Responsibilities: container scanning, SBOM generation, pipeline gate evaluation. """ + import uuid -from datetime import datetime, UTC -from typing import Dict, List, Any, Optional from dataclasses import dataclass, field +from datetime import UTC, datetime from enum import Enum +from typing import Any, Dict, List, Optional + from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import get_db, AgentRunRecord + +from backend.db.database import AgentRunRecord, get_db class SeverityLevel(str, Enum): @@ -33,20 +36,35 @@ class DevSecOpsAgent: def __init__(self, mock_mode: bool = True): self.mock_mode = mock_mode - def scan_container_image(self, image_name: str, image_tag: str = "latest", - base_image: Optional[str] = None) -> Dict[str, Any]: + def scan_container_image( + self, + image_name: str, + image_tag: str = "latest", + base_image: Optional[str] = None, + ) -> Dict[str, Any]: """Simulate CVE scan. Production: Trivy/Grype. Maps to NIST 800-190, CMMC SI.1.210.""" - cves = [ - {"cve_id": "CVE-2024-1234", "severity": "high", "package": "openssl", - "version": "3.0.1", "fixed_in": "3.0.2", "cmmc_controls": ["SI.1.210", "SI.2.214"]} - ] if "vulnerable" in image_name else [] + cves = ( + [ + { + "cve_id": "CVE-2024-1234", + "severity": "high", + "package": "openssl", + "version": "3.0.1", + "fixed_in": "3.0.2", + "cmmc_controls": ["SI.1.210", "SI.2.214"], + } + ] + if "vulnerable" in image_name + else [] + ) return { "image": f"{image_name}:{image_tag}", "base_image": base_image, "base_image_approved": base_image in self.APPROVED_BASE_IMAGES, "overall_risk": "pass" if not cves else "high", "cve_findings": cves, - "critical_count": 0, "high_count": len(cves), + "critical_count": 0, + "high_count": len(cves), "evidence_id": str(uuid.uuid4()), "scanned_at": datetime.now(UTC).isoformat(), "cmmc_controls": ["SI.1.210", "SI.2.214", "CM.2.061", "CM.3.068"], @@ -56,14 +74,34 @@ def scan_container_image(self, image_name: str, image_tag: str = "latest", def generate_sbom(self, service_name: str) -> Dict[str, Any]: """Generate CycloneDX SBOM. Maps to EO 14028, CMMC CM.2.062.""" components = [ - {"name": "fastapi", "version": "0.111.0", "license": "MIT", - "supplier": "tiangolo", "cves": 0}, - {"name": "mistralai", "version": "0.4.2", "license": "Apache-2.0", - "supplier": "Mistral AI", "cves": 0}, - {"name": "sqlalchemy", "version": "2.0.30", "license": "MIT", - "supplier": "SQLAlchemy", "cves": 0}, - {"name": "pydantic", "version": "2.7.0", "license": "MIT", - "supplier": "pydantic", "cves": 0}, + { + "name": "fastapi", + "version": "0.111.0", + "license": "MIT", + "supplier": "tiangolo", + "cves": 0, + }, + { + "name": "mistralai", + "version": "0.4.2", + "license": "Apache-2.0", + "supplier": "Mistral AI", + "cves": 0, + }, + { + "name": "sqlalchemy", + "version": "2.0.30", + "license": "MIT", + "supplier": "SQLAlchemy", + "cves": 0, + }, + { + "name": "pydantic", + "version": "2.7.0", + "license": "MIT", + "supplier": "pydantic", + "cves": 0, + }, ] return { "sbom_format": "CycloneDX-1.5", @@ -80,8 +118,12 @@ def generate_sbom(self, service_name: str) -> Dict[str, Any]: def evaluate_pipeline_gates(self, pipeline_id: str) -> Dict[str, Any]: """Evaluate CI/CD security gates. Maps to Cloud Security Playbook Play 20.""" gate_results = { - "sast": "pass", "secret_scan": "pass", "dependency_check": "warn", - "container_scan": "pass", "sbom_generation": "pass", "image_signing": "fail", + "sast": "pass", + "secret_scan": "pass", + "dependency_check": "warn", + "container_scan": "pass", + "sbom_generation": "pass", + "image_signing": "fail", } failed = [g for g, s in gate_results.items() if s == "fail"] warned = [g for g, s in gate_results.items() if s == "warn"] @@ -102,7 +144,9 @@ def evaluate_pipeline_gates(self, pipeline_id: str) -> Dict[str, Any]: "evaluated_at": datetime.now(UTC).isoformat(), } - async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc-api", trigger: str = "manual") -> Dict[str, Any]: + async def run_full_assessment( + self, db: AsyncSession, service_name: str = "cmmc-api", trigger: str = "manual" + ) -> Dict[str, Any]: """Run complete DevSecOps assessment pipeline.""" image_scan = self.scan_container_image(service_name) sbom = self.generate_sbom(service_name) @@ -131,11 +175,17 @@ async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc- agent_type="devsecops", trigger=trigger, scope=service_name, - controls_evaluated=list(set(image_scan["cmmc_controls"] + sbom["cmmc_controls"] + pipeline["cmmc_controls"])), + controls_evaluated=list( + set( + image_scan["cmmc_controls"] + + sbom["cmmc_controls"] + + pipeline["cmmc_controls"] + ) + ), findings=result, status="completed", created_at=datetime.now(UTC), - completed_at=datetime.now(UTC) + completed_at=datetime.now(UTC), ) db.add(record) await db.commit() @@ -147,7 +197,9 @@ async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc- _dso = DevSecOpsAgent() -@router.get("/assess/{service_name}", summary="Run full DevSecOps ZT Application assessment") +@router.get( + "/assess/{service_name}", summary="Run full DevSecOps ZT Application assessment" +) async def assess_service(service_name: str, db: AsyncSession = Depends(get_db)): """Container scan + SBOM + pipeline gates - ZT Application Pillar evidence.""" return await _dso.run_full_assessment(db, service_name) diff --git a/agents/icam_agent/agent.py b/agents/icam_agent/agent.py index d262952..313f703 100644 --- a/agents/icam_agent/agent.py +++ b/agents/icam_agent/agent.py @@ -5,20 +5,39 @@ Maps to DoD ZT User Pillar and CMMC Access Control (AC) / Identification & Authentication (IA) domains. Implements Fulcrum LOE 1 - Identity management. """ + import uuid -from datetime import datetime, timedelta, UTC -from typing import Dict, List, Any, Optional from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from typing import Any, Dict, List, Optional + from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import get_db, AgentRunRecord + +from backend.db.database import AgentRunRecord, get_db # CMMC AC + IA controls owned by this agent ICAM_CONTROLS = [ - "AC.1.001", "AC.1.002", "AC.2.005", "AC.2.006", "AC.2.007", - "AC.2.008", "AC.2.009", "AC.2.010", "AC.2.011", "AC.2.013", - "AC.2.015", "AC.2.016", "IA.1.076", "IA.1.077", "IA.2.078", - "IA.2.079", "IA.2.080", "IA.2.081", "IA.2.082", "IA.3.083", + "AC.1.001", + "AC.1.002", + "AC.2.005", + "AC.2.006", + "AC.2.007", + "AC.2.008", + "AC.2.009", + "AC.2.010", + "AC.2.011", + "AC.2.013", + "AC.2.015", + "AC.2.016", + "IA.1.076", + "IA.1.077", + "IA.2.078", + "IA.2.079", + "IA.2.080", + "IA.2.081", + "IA.2.082", + "IA.3.083", "IA.3.084", ] @@ -63,36 +82,52 @@ def _load_mock_users(self): """Load representative mock user data for demo.""" self.users = [ UserRecord( - user_id="u001", username="alice.admin", + user_id="u001", + username="alice.admin", roles=["SystemAdmin", "CUI_Handler"], - mfa_enabled=True, mfa_type="fido2", + mfa_enabled=True, + mfa_type="fido2", last_login=datetime.now(UTC) - timedelta(days=1), - account_status="active", privileged=True, - department="IT", last_access_review=datetime.now(UTC) - timedelta(days=30), + account_status="active", + privileged=True, + department="IT", + last_access_review=datetime.now(UTC) - timedelta(days=30), ), UserRecord( - user_id="u002", username="bob.dev", + user_id="u002", + username="bob.dev", roles=["Developer"], - mfa_enabled=True, mfa_type="totp", + mfa_enabled=True, + mfa_type="totp", last_login=datetime.now(UTC) - timedelta(days=2), - account_status="active", privileged=False, - department="Engineering", last_access_review=datetime.now(UTC) - timedelta(days=60), + account_status="active", + privileged=False, + department="Engineering", + last_access_review=datetime.now(UTC) - timedelta(days=60), ), UserRecord( - user_id="u003", username="carol.svc", + user_id="u003", + username="carol.svc", roles=["ServiceAccount"], - mfa_enabled=False, mfa_type="none", + mfa_enabled=False, + mfa_type="none", last_login=None, - account_status="active", privileged=True, - department="Automation", last_access_review=None, + account_status="active", + privileged=True, + department="Automation", + last_access_review=None, ), UserRecord( - user_id="u004", username="dave.old", + user_id="u004", + username="dave.old", roles=["Developer"], - mfa_enabled=False, mfa_type="none", + mfa_enabled=False, + mfa_type="none", last_login=datetime.now(UTC) - timedelta(days=200), - account_status="active", privileged=False, - department="Engineering", last_access_review=datetime.now(UTC) - timedelta(days=400), + account_status="active", + privileged=False, + department="Engineering", + last_access_review=datetime.now(UTC) - timedelta(days=400), ), ] @@ -104,7 +139,9 @@ def check_mfa_coverage(self) -> ICAMAssessmentResult: privileged_with_mfa = [u for u in privileged_users if u.mfa_enabled] coverage_pct = len(mfa_users) / total if total else 0 - privileged_coverage = len(privileged_with_mfa) / len(privileged_users) if privileged_users else 1 + privileged_coverage = ( + len(privileged_with_mfa) / len(privileged_users) if privileged_users else 1 + ) findings = [] remediation = [] @@ -114,11 +151,15 @@ def check_mfa_coverage(self) -> ICAMAssessmentResult: remediation.append("Enroll all users in MFA - enforce via IdP policy") if privileged_coverage < 1.0: findings.append("Privileged accounts without MFA detected - CRITICAL gap") - remediation.append("Immediately enroll privileged accounts in FIDO2/hardware MFA") + remediation.append( + "Immediately enroll privileged accounts in FIDO2/hardware MFA" + ) - confidence = (coverage_pct * 0.5 + privileged_coverage * 0.5) - status = "implemented" if confidence >= 0.95 else ( - "partially_implemented" if confidence >= 0.5 else "not_implemented" + confidence = coverage_pct * 0.5 + privileged_coverage * 0.5 + status = ( + "implemented" + if confidence >= 0.95 + else ("partially_implemented" if confidence >= 0.5 else "not_implemented") ) return ICAMAssessmentResult( @@ -133,11 +174,13 @@ def check_mfa_coverage(self) -> ICAMAssessmentResult: def check_least_privilege(self) -> ICAMAssessmentResult: """Assess AC.2.007 - Principle of least privilege.""" stale_accounts = [ - u for u in self.users + u + for u in self.users if u.last_login and (datetime.now(UTC) - u.last_login).days > 90 ] unreviewed = [ - u for u in self.users + u + for u in self.users if not u.last_access_review or (datetime.now(UTC) - u.last_access_review).days > 365 ] @@ -146,14 +189,22 @@ def check_least_privilege(self) -> ICAMAssessmentResult: remediation = [] if stale_accounts: findings.append(f"{len(stale_accounts)} accounts inactive >90 days") - remediation.append("Disable or review stale accounts - automate via JML workflow") + remediation.append( + "Disable or review stale accounts - automate via JML workflow" + ) if unreviewed: findings.append(f"{len(unreviewed)} accounts overdue for access review") remediation.append("Conduct quarterly access reviews; document in evidence") - confidence = max(0.0, 1.0 - (len(stale_accounts) + len(unreviewed)) / max(len(self.users), 1) * 0.5) - status = "implemented" if confidence >= 0.9 else ( - "partially_implemented" if confidence >= 0.6 else "not_implemented" + confidence = max( + 0.0, + 1.0 + - (len(stale_accounts) + len(unreviewed)) / max(len(self.users), 1) * 0.5, + ) + status = ( + "implemented" + if confidence >= 0.9 + else ("partially_implemented" if confidence >= 0.6 else "not_implemented") ) return ICAMAssessmentResult( @@ -165,7 +216,9 @@ def check_least_privilege(self) -> ICAMAssessmentResult: remediation=remediation, ) - async def run_full_assessment(self, db: AsyncSession, trigger: str = "manual") -> List[Dict[str, Any]]: + async def run_full_assessment( + self, db: AsyncSession, trigger: str = "manual" + ) -> List[Dict[str, Any]]: """Run all ICAM assessments and return evidence-ready results.""" assessments = [ self.check_mfa_coverage(), @@ -173,17 +226,19 @@ async def run_full_assessment(self, db: AsyncSession, trigger: str = "manual") - ] results = [] for a in assessments: - results.append({ - "control_id": a.control_id, - "zt_pillar": "User", - "status": a.status, - "confidence": a.confidence, - "findings": a.findings, - "remediation": a.remediation, - "evidence_id": a.evidence_id, - "assessed_at": a.assessed_at.isoformat(), - "owner_agent": "icam", - }) + results.append( + { + "control_id": a.control_id, + "zt_pillar": "User", + "status": a.status, + "confidence": a.confidence, + "findings": a.findings, + "remediation": a.remediation, + "evidence_id": a.evidence_id, + "assessed_at": a.assessed_at.isoformat(), + "owner_agent": "icam", + } + ) # Persist result record = AgentRunRecord( @@ -195,7 +250,7 @@ async def run_full_assessment(self, db: AsyncSession, trigger: str = "manual") - findings={"results": results}, status="completed", created_at=datetime.now(UTC), - completed_at=datetime.now(UTC) + completed_at=datetime.now(UTC), ) db.add(record) await db.commit() @@ -225,7 +280,9 @@ async def list_users(): """Return user inventory for access review evidence.""" return { "total_users": len(_icam.users), - "mfa_coverage_pct": sum(1 for u in _icam.users if u.mfa_enabled) / len(_icam.users) * 100, + "mfa_coverage_pct": sum(1 for u in _icam.users if u.mfa_enabled) + / len(_icam.users) + * 100, "privileged_users": sum(1 for u in _icam.users if u.privileged), "users": [ { diff --git a/agents/mistral_agent/agent.py b/agents/mistral_agent/agent.py index 0a611a0..e09878d 100644 --- a/agents/mistral_agent/agent.py +++ b/agents/mistral_agent/agent.py @@ -6,14 +6,17 @@ to analyze compliance gaps, generate remediation guidance, assess control implementation evidence, and produce POAM recommendations. """ -import os + import json +import os import uuid -from datetime import datetime, UTC -from typing import Optional, List, Dict, Any +from datetime import UTC, datetime +from typing import Any, Dict, List, Optional + from mistralai import Mistral from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession + from backend.db.database import AgentRunRecord, get_db # ─── Mistral Configuration ───────────────────────────────────────────────────── @@ -33,6 +36,7 @@ def __init__(self): if USE_LOCAL: # Use Ollama local endpoint for air-gapped environments from openai import AsyncOpenAI + self.client = AsyncOpenAI( base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434/v1"), api_key="ollama", @@ -42,7 +46,9 @@ def __init__(self): else: if not MISTRAL_API_KEY: self.client = None - print("Warning: MISTRAL_API_KEY not set. Mistral agent will operate in mock mode.") + print( + "Warning: MISTRAL_API_KEY not set. Mistral agent will operate in mock mode." + ) else: self.client = Mistral(api_key=MISTRAL_API_KEY) self.model = MISTRAL_MODEL @@ -53,17 +59,24 @@ async def _chat(self, system: str, user: str, model: Optional[str] = None) -> st """Send a chat completion request to Mistral.""" if not self.client: # Return mock JSON response if no client - return json.dumps({ - "gap_summary": "Mock analysis: Mistral API key missing.", - "severity": "medium", - "zt_impact": "Simulated impact on ZT pillar.", - "confidence_score": 0.5, - "remediation_steps": ["Configure MISTRAL_API_KEY"], - "estimated_effort_days": 1, - "poam_entry": {"milestone": "Setup API", "completion_date": "TBD", "responsible_party": "Admin", "resources": "API Key"}, - "overall_risk": "medium", - "security_issues": [] - }) + return json.dumps( + { + "gap_summary": "Mock analysis: Mistral API key missing.", + "severity": "medium", + "zt_impact": "Simulated impact on ZT pillar.", + "confidence_score": 0.5, + "remediation_steps": ["Configure MISTRAL_API_KEY"], + "estimated_effort_days": 1, + "poam_entry": { + "milestone": "Setup API", + "completion_date": "TBD", + "responsible_party": "Admin", + "resources": "API Key", + }, + "overall_risk": "medium", + "security_issues": [], + } + ) m = model or self.model messages = [ @@ -83,7 +96,15 @@ async def _chat(self, system: str, user: str, model: Optional[str] = None) -> st ) return response.choices[0].message.content - async def record_run(self, db: AsyncSession, trigger: str, scope: str, controls: List[str], findings: Dict[str, Any], status: str = "completed"): + async def record_run( + self, + db: AsyncSession, + trigger: str, + scope: str, + controls: List[str], + findings: Dict[str, Any], + status: str = "completed", + ): record = AgentRunRecord( id=str(uuid.uuid4()), agent_type="mistral", @@ -94,7 +115,7 @@ async def record_run(self, db: AsyncSession, trigger: str, scope: str, controls: status=status, mistral_model=self.model, created_at=datetime.now(UTC), - completed_at=datetime.now(UTC) + completed_at=datetime.now(UTC), ) db.add(record) await db.commit() @@ -250,7 +271,7 @@ async def answer_compliance_question(self, question: str, context: str = "") -> # ─── FastAPI router for Mistral agent endpoints ──────────────────────────────── -from fastapi import APIRouter, HTTPException, Depends +from fastapi import APIRouter, Depends, HTTPException router = APIRouter() agent = MistralComplianceAgent() @@ -281,11 +302,21 @@ async def gap_analysis(req: GapAnalysisRequest, db: AsyncSession = Depends(get_d """Use Mistral to analyze a compliance gap and return remediation steps.""" try: result = await agent.analyze_gap( - req.control_id, req.control_title, req.control_description, - req.zt_pillar, req.current_status, req.existing_evidence + req.control_id, + req.control_title, + req.control_description, + req.zt_pillar, + req.current_status, + req.existing_evidence, ) - await agent.record_run(db, "manual", f"Gap Analysis: {req.control_id}", [req.control_id], result) - return {"control_id": req.control_id, "analysis": result, "model": MISTRAL_MODEL} + await agent.record_run( + db, "manual", f"Gap Analysis: {req.control_id}", [req.control_id], result + ) + return { + "control_id": req.control_id, + "analysis": result, + "model": MISTRAL_MODEL, + } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -297,7 +328,9 @@ async def code_review(req: CodeReviewRequest, db: AsyncSession = Depends(get_db) result = await agent.analyze_code_security( req.code_snippet, req.language, req.relevant_controls ) - await agent.record_run(db, "manual", "Code Review", req.relevant_controls or [], result) + await agent.record_run( + db, "manual", "Code Review", req.relevant_controls or [], result + ) return {"analysis": result, "model": MISTRAL_CODE_MODEL} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/agents/orchestrator/agent.py b/agents/orchestrator/agent.py index a1a7ae3..2258511 100644 --- a/agents/orchestrator/agent.py +++ b/agents/orchestrator/agent.py @@ -8,25 +8,30 @@ Aligns with DoD ZT Orchestration/Automation pillar and Fulcrum LOE 3/4. """ -import uuid + import json -from datetime import datetime, UTC -from typing import Dict, List, Any, Optional -from enum import Enum +import uuid from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any, Dict, List, Optional + from fastapi import APIRouter, Depends +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import get_db, AgentRunRecord, ControlRecord, AssessmentRecord -from sqlalchemy import select, func + +from backend.db.database import (AgentRunRecord, AssessmentRecord, + ControlRecord, get_db) + class AgentType(str, Enum): - ICAM = "icam" # Identity/Credential/Access Mgmt - DATA = "data_protection" # Data-centric security - INFRA = "infrastructure" # Network/micro-segmentation - DEVSECOPS = "devsecops" # DevSecOps/supply chain - GOVERNANCE = "governance" # Policy/risk/POA&M - OPS = "operations" # IR/SIEM/SOAR - MISTRAL = "mistral" # AI analysis engine + ICAM = "icam" # Identity/Credential/Access Mgmt + DATA = "data_protection" # Data-centric security + INFRA = "infrastructure" # Network/micro-segmentation + DEVSECOPS = "devsecops" # DevSecOps/supply chain + GOVERNANCE = "governance" # Policy/risk/POA&M + OPS = "operations" # IR/SIEM/SOAR + MISTRAL = "mistral" # AI analysis engine class TaskTrigger(str, Enum): @@ -41,10 +46,10 @@ class TaskTrigger(str, Enum): class Task: id: str = field(default_factory=lambda: str(uuid.uuid4())) trigger: TaskTrigger = TaskTrigger.MANUAL - scope: str = "" # system name or service + scope: str = "" # system name or service required_controls: List[str] = field(default_factory=list) assigned_agents: List[AgentType] = field(default_factory=list) - status: str = "pending" # pending/running/completed/failed + status: str = "pending" # pending/running/completed/failed created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) completed_at: Optional[datetime] = None findings: Dict[str, Any] = field(default_factory=dict) @@ -53,10 +58,11 @@ class Task: @dataclass class ControlStatus: """Canonical control status record - shared schema for all agents.""" + control_id: str zt_pillar: str - status: str # implemented/partial/planned/not_implemented - confidence: float # 0.0-1.0 ZT confidence score + status: str # implemented/partial/planned/not_implemented + confidence: float # 0.0-1.0 ZT confidence score evidence_ids: List[str] = field(default_factory=list) owner_agent: AgentType = AgentType.GOVERNANCE last_updated: datetime = field(default_factory=lambda: datetime.now(UTC)) @@ -88,12 +94,24 @@ class ComplianceOrchestrator: # Total possible score = 110 points SPRS_DEDUCTIONS = { # High value controls (5 points each) - "AC.2.006": 5, "AC.2.007": 5, "AC.3.017": 5, "AC.3.018": 5, - "IA.3.083": 5, "IA.3.084": 5, "SC.3.177": 5, + "AC.2.006": 5, + "AC.2.007": 5, + "AC.3.017": 5, + "AC.3.018": 5, + "IA.3.083": 5, + "IA.3.084": 5, + "SC.3.177": 5, # Medium value controls (3 points each) - "AC.1.001": 3, "AC.1.002": 3, "IA.1.076": 3, "IA.1.077": 3, - "SC.1.175": 3, "SC.1.176": 3, "SI.1.210": 3, "SI.1.211": 3, - "SI.1.212": 3, "SI.1.213": 3, + "AC.1.001": 3, + "AC.1.002": 3, + "IA.1.076": 3, + "IA.1.077": 3, + "SC.1.175": 3, + "SC.1.176": 3, + "SI.1.210": 3, + "SI.1.211": 3, + "SI.1.212": 3, + "SI.1.213": 3, } def __init__(self): @@ -118,8 +136,16 @@ def create_task( ) # Route based on trigger type if trigger == TaskTrigger.CODE_PUSH: - task.assigned_agents = [AgentType.DEVSECOPS, AgentType.ICAM, AgentType.MISTRAL] - task.required_controls = task.required_controls or ["SI.2.214", "CM.2.061", "AC.1.001"] + task.assigned_agents = [ + AgentType.DEVSECOPS, + AgentType.ICAM, + AgentType.MISTRAL, + ] + task.required_controls = task.required_controls or [ + "SI.2.214", + "CM.2.061", + "AC.1.001", + ] elif trigger == TaskTrigger.INCIDENT: task.assigned_agents = [AgentType.OPS, AgentType.MISTRAL] task.required_controls = task.required_controls or ["IR.2.092", "AU.2.041"] @@ -135,18 +161,15 @@ async def _get_latest_assessments(self, db: AsyncSession): sub_q = ( select( AssessmentRecord.control_id, - func.max(AssessmentRecord.assessment_date).label("max_date") + func.max(AssessmentRecord.assessment_date).label("max_date"), ) .group_by(AssessmentRecord.control_id) .subquery() ) - query = ( - select(AssessmentRecord) - .join( - sub_q, - (AssessmentRecord.control_id == sub_q.c.control_id) & - (AssessmentRecord.assessment_date == sub_q.c.max_date) - ) + query = select(AssessmentRecord).join( + sub_q, + (AssessmentRecord.control_id == sub_q.c.control_id) + & (AssessmentRecord.assessment_date == sub_q.c.max_date), ) result = await db.execute(query) return {a.control_id: a for a in result.scalars().all()} @@ -180,7 +203,7 @@ async def compute_sprs_score(self, db: AsyncSession) -> Dict[str, Any]: "controls_assessed": len(controls), "controls_implemented": implemented_count, "controls_not_implemented": not_implemented_count, - "deductions": deductions_list + "deductions": deductions_list, } async def compute_zt_scorecard(self, db: AsyncSession) -> List[Dict[str, Any]]: @@ -215,15 +238,21 @@ async def compute_zt_scorecard(self, db: AsyncSession) -> List[Dict[str, Any]]: avg_confidence = sum(confidences) / len(confidences) if confidences else 0 - scorecard.append({ - "pillar": pillar, - "total_controls": total, - "implemented": implemented, - "partial": partial, - "not_implemented": total - implemented - partial, - "maturity_pct": round((implemented + 0.5 * partial) / total * 100, 1) if total > 0 else 0, - "confidence_avg": round(avg_confidence, 2), - }) + scorecard.append( + { + "pillar": pillar, + "total_controls": total, + "implemented": implemented, + "partial": partial, + "not_implemented": total - implemented - partial, + "maturity_pct": ( + round((implemented + 0.5 * partial) / total * 100, 1) + if total > 0 + else 0 + ), + "confidence_avg": round(avg_confidence, 2), + } + ) return scorecard async def generate_report(self, db: AsyncSession) -> Dict[str, Any]: @@ -231,7 +260,9 @@ async def generate_report(self, db: AsyncSession) -> Dict[str, Any]: sprs_data = await self.compute_sprs_score(db) zt_scorecard = await self.compute_zt_scorecard(db) - runs_query = select(AgentRunRecord).order_by(AgentRunRecord.created_at.desc()).limit(10) + runs_query = ( + select(AgentRunRecord).order_by(AgentRunRecord.created_at.desc()).limit(10) + ) runs_result = await db.execute(runs_query) runs = runs_result.scalars().all() @@ -246,7 +277,7 @@ async def generate_report(self, db: AsyncSession) -> Dict[str, Any]: "agent": r.agent_type, "status": r.status, "created_at": r.created_at.isoformat(), - "scope": r.scope + "scope": r.scope, } for r in runs ], diff --git a/backend/db/database.py b/backend/db/database.py index 7e53b1e..e35b3be 100644 --- a/backend/db/database.py +++ b/backend/db/database.py @@ -2,12 +2,15 @@ Database initialization and session management. AGI Corporation CMMC Platform 2026 """ -import os + import json -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession -from sqlalchemy.orm import sessionmaker, DeclarativeBase -from sqlalchemy import Column, String, Integer, Float, DateTime, Text, JSON, select, Index, func -from datetime import datetime, UTC +import os +from datetime import UTC, datetime + +from sqlalchemy import (JSON, Column, DateTime, Float, Index, Integer, String, + Text, func, select) +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import DeclarativeBase, sessionmaker DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./cmmc.db") @@ -17,9 +20,7 @@ connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}, ) -AsyncSessionLocal = sessionmaker( - engine, class_=AsyncSession, expire_on_commit=False -) +AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) class Base(DeclarativeBase): @@ -35,10 +36,14 @@ class ControlRecord(Base): description = Column(Text) zt_pillar = Column(String) # User/Device/Network/App/Data/Visibility/Automation nist_mapping = Column(String) # e.g. 3.1.1 - status = Column(String, default="not_implemented") # implemented/partial/planned/not_implemented + status = Column( + String, default="not_implemented" + ) # implemented/partial/planned/not_implemented score_value = Column(Integer, default=1) created_at = Column(DateTime, default=lambda: datetime.now(UTC)) - updated_at = Column(DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC)) + updated_at = Column( + DateTime, default=lambda: datetime.now(UTC), onupdate=lambda: datetime.now(UTC) + ) class EvidenceRecord(Base): @@ -72,9 +77,7 @@ class AssessmentRecord(Base): next_review = Column(DateTime) poam_required = Column(String, default="false") - __table_args__ = ( - Index('idx_control_date', 'control_id', 'assessment_date'), - ) + __table_args__ = (Index("idx_control_date", "control_id", "assessment_date"),) class AgentRunRecord(Base): @@ -100,7 +103,9 @@ async def init_db(): async with AsyncSessionLocal() as session: result = await session.execute(select(ControlRecord)) if not result.scalars().first(): - schema_path = os.getenv("OSCAL_CATALOG_PATH", "./schema/cmmc_oscal_catalog.json") + schema_path = os.getenv( + "OSCAL_CATALOG_PATH", "./schema/cmmc_oscal_catalog.json" + ) if os.path.exists(schema_path): with open(schema_path) as f: data = json.load(f) @@ -113,7 +118,7 @@ async def init_db(): title=c["title"], description=c["description"], nist_mapping=c.get("nist_mapping"), - score_value=c.get("weight", 1) + score_value=c.get("weight", 1), ) session.add(db_ctrl) await session.commit() @@ -137,26 +142,20 @@ async def get_latest_assessments(db: AsyncSession, control_ids: list[str] = None Get the most recent AssessmentRecord for each control_id. Optionally filter by a list of control_ids. """ - sub_q = ( - select( - AssessmentRecord.control_id, - func.max(AssessmentRecord.assessment_date).label("max_date") - ) - .group_by(AssessmentRecord.control_id) - ) + sub_q = select( + AssessmentRecord.control_id, + func.max(AssessmentRecord.assessment_date).label("max_date"), + ).group_by(AssessmentRecord.control_id) if control_ids: sub_q = sub_q.where(AssessmentRecord.control_id.in_(control_ids)) sub_q = sub_q.subquery() - query = ( - select(AssessmentRecord) - .join( - sub_q, - (AssessmentRecord.control_id == sub_q.c.control_id) & - (AssessmentRecord.assessment_date == sub_q.c.max_date) - ) + query = select(AssessmentRecord).join( + sub_q, + (AssessmentRecord.control_id == sub_q.c.control_id) + & (AssessmentRecord.assessment_date == sub_q.c.max_date), ) result = await db.execute(query) diff --git a/backend/main.py b/backend/main.py index d20826e..337fc78 100644 --- a/backend/main.py +++ b/backend/main.py @@ -7,21 +7,21 @@ Model Context Protocol (MCP). """ +import json +import os +from contextlib import asynccontextmanager + +from dotenv import load_dotenv from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi_mcp import FastApiMCP -from contextlib import asynccontextmanager -import json -import os -from backend.routers import controls, assessment, evidence, reports -from agents.orchestrator import agent as orchestrator -from agents.icam_agent import agent as icam from agents.devsecops_agent import agent as devsecops +from agents.icam_agent import agent as icam from agents.mistral_agent import agent as mistral - +from agents.orchestrator import agent as orchestrator from backend.db.database import init_db -from dotenv import load_dotenv +from backend.routers import assessment, controls, evidence, reports load_dotenv() @@ -48,7 +48,9 @@ async def lifespan(app: FastAPI): # ─── CORS ───────────────────────────────────────────────────────────────────── -cors_origins = json.loads(os.getenv("CORS_ORIGINS", '["http://localhost:3000", "http://localhost:5173"]')) +cors_origins = json.loads( + os.getenv("CORS_ORIGINS", '["http://localhost:3000", "http://localhost:5173"]') +) app.add_middleware( CORSMiddleware, @@ -67,14 +69,19 @@ async def lifespan(app: FastAPI): app.include_router(reports.router, prefix="/api/reports", tags=["Reports"]) # Agent Routers -app.include_router(orchestrator.router, prefix="/api/orchestrator", tags=["Orchestrator"]) +app.include_router( + orchestrator.router, prefix="/api/orchestrator", tags=["Orchestrator"] +) app.include_router(icam.router, prefix="/api/agents/icam", tags=["ICAM Agent"]) -app.include_router(devsecops.router, prefix="/api/agents/devsecops", tags=["DevSecOps Agent"]) +app.include_router( + devsecops.router, prefix="/api/agents/devsecops", tags=["DevSecOps Agent"] +) app.include_router(mistral.router, prefix="/api/agents/mistral", tags=["Mistral Agent"]) # ─── Health Check ───────────────────────────────────────────────────────────── + @app.get("/", tags=["Health"]) async def root(): return { diff --git a/backend/models/control.py b/backend/models/control.py index 897063b..59d4a07 100644 --- a/backend/models/control.py +++ b/backend/models/control.py @@ -1,10 +1,12 @@ """ Pydantic models for CMMC Controls """ -from pydantic import BaseModel, Field -from typing import Optional, List + +from datetime import UTC, datetime from enum import Enum -from datetime import datetime, UTC +from typing import List, Optional + +from pydantic import BaseModel, Field class CMMCLevel(str, Enum): @@ -41,15 +43,26 @@ class ImplementationStatus(str, Enum): class Control(BaseModel): """CMMC Control - aligned with OSCAL catalog model.""" + id: str = Field(..., description="Control ID (e.g., AC.1.001)") title: str = Field(..., description="Control title") description: str = Field(..., description="Control description / requirement") domain: ControlDomain = Field(..., description="Control domain / family") - level: CMMCLevel = Field(..., description="Minimum CMMC level requiring this control") - nist_mapping: Optional[str] = Field(None, description="Mapped NIST SP 800-171 control ID") - discussion: Optional[str] = Field(None, description="CMMC discussion / implementation guidance") - assessment_objectives: Optional[List[str]] = Field(default_factory=list, description="Assessment objectives") - weight: int = Field(default=1, description="SPRS point weight (deducted if not implemented)") + level: CMMCLevel = Field( + ..., description="Minimum CMMC level requiring this control" + ) + nist_mapping: Optional[str] = Field( + None, description="Mapped NIST SP 800-171 control ID" + ) + discussion: Optional[str] = Field( + None, description="CMMC discussion / implementation guidance" + ) + assessment_objectives: Optional[List[str]] = Field( + default_factory=list, description="Assessment objectives" + ) + weight: int = Field( + default=1, description="SPRS point weight (deducted if not implemented)" + ) created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) class Config: @@ -61,13 +74,14 @@ class Config: "domain": "AC", "level": "Level 1", "nist_mapping": "3.1.1", - "weight": 5 + "weight": 5, } } class ControlResponse(BaseModel): """API response wrapper for a single control.""" + control: Control implementation_status: Optional[ImplementationStatus] = None evidence_count: int = 0 @@ -78,6 +92,7 @@ class ControlResponse(BaseModel): class ControlListResponse(BaseModel): """API response for list of controls.""" + controls: List[ControlResponse] total: int level_filter: Optional[CMMCLevel] = None @@ -86,6 +101,7 @@ class ControlListResponse(BaseModel): class ControlUpdate(BaseModel): """Request body for updating control implementation status.""" + implementation_status: ImplementationStatus notes: Optional[str] = None responsible_party: Optional[str] = None diff --git a/backend/models/evidence.py b/backend/models/evidence.py index 8b7c6e7..5b3ddb8 100644 --- a/backend/models/evidence.py +++ b/backend/models/evidence.py @@ -2,10 +2,12 @@ Pydantic models for Evidence artifacts. AGI Corporation 2026 """ -from pydantic import BaseModel, Field -from typing import Optional, List, Dict, Any -from enum import Enum + from datetime import datetime +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field class EvidenceType(str, Enum): @@ -52,6 +54,7 @@ class EvidenceListResponse(BaseModel): class EvidenceSchema(BaseModel): """Canonical evidence schema for agent interoperability.""" + id: str type: EvidenceType source_system: str diff --git a/backend/routers/assessment.py b/backend/routers/assessment.py index a37f2e8..ff7fbe0 100644 --- a/backend/routers/assessment.py +++ b/backend/routers/assessment.py @@ -2,16 +2,19 @@ Assessment Router - SPRS score calculation and compliance dashboard. These endpoints become MCP tools: calculate_sprs_score, get_compliance_dashboard. """ -from fastapi import APIRouter, Depends, HTTPException -from pydantic import BaseModel -from typing import Dict, List, Optional + import os import uuid -from datetime import datetime, UTC +from datetime import UTC, datetime +from typing import Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, func -from backend.db.database import get_db, ControlRecord, AssessmentRecord, AgentRunRecord, get_latest_assessments +from backend.db.database import (AgentRunRecord, AssessmentRecord, + ControlRecord, get_db, get_latest_assessments) router = APIRouter() @@ -20,12 +23,24 @@ # Total possible score = 110 points SPRS_DEDUCTIONS = { # High value controls (5 points each) - "AC.2.006": 5, "AC.2.007": 5, "AC.3.017": 5, "AC.3.018": 5, - "IA.3.083": 5, "IA.3.084": 5, "SC.3.177": 5, - # Medium value controls (3 points each) - "AC.1.001": 3, "AC.1.002": 3, "IA.1.076": 3, "IA.1.077": 3, - "SC.1.175": 3, "SC.1.176": 3, "SI.1.210": 3, "SI.1.211": 3, - "SI.1.212": 3, "SI.1.213": 3, + "AC.2.006": 5, + "AC.2.007": 5, + "AC.3.017": 5, + "AC.3.018": 5, + "IA.3.083": 5, + "IA.3.084": 5, + "SC.3.177": 5, + # Medium value controls (3 points each) + "AC.1.001": 3, + "AC.1.002": 3, + "IA.1.076": 3, + "IA.1.077": 3, + "SC.1.175": 3, + "SC.1.176": 3, + "SI.1.210": 3, + "SI.1.211": 3, + "SI.1.212": 3, + "SI.1.213": 3, } @@ -55,20 +70,25 @@ class SPRSResult(BaseModel): certification_level: str assessment_date: str + @router.get( "/dashboard", response_model=DashboardSummary, summary="Get Compliance Dashboard", - description="Get overall CMMC compliance posture summary including implementation percentages, SPRS score, and breakdown by domain and level." + description="Get overall CMMC compliance posture summary including implementation percentages, SPRS score, and breakdown by domain and level.", ) async def get_compliance_dashboard(db: AsyncSession = Depends(get_db)): result = await db.execute(select(ControlRecord)) controls = result.scalars().all() assessments_map = await get_latest_assessments(db) - + by_domain = {} - by_level = {"Level 1": {"total": 0, "implemented": 0}, "Level 2": {"total": 0, "implemented": 0}, "Level 3": {"total": 0, "implemented": 0}} + by_level = { + "Level 1": {"total": 0, "implemented": 0}, + "Level 2": {"total": 0, "implemented": 0}, + "Level 3": {"total": 0, "implemented": 0}, + } implemented = not_implemented = partial = not_started = not_applicable = 0 sprs_score = 110 # Start at max, deduct for non-implemented @@ -105,7 +125,7 @@ async def get_compliance_dashboard(db: AsyncSession = Depends(get_db)): total = len(controls) pct = (implemented / total * 100) if total > 0 else 0 - + if pct >= 100: readiness = "Ready for Certification" elif pct >= 80: @@ -126,7 +146,7 @@ async def get_compliance_dashboard(db: AsyncSession = Depends(get_db)): sprs_score=max(sprs_score, -203), # SPRS floor is -203 by_domain=by_domain, by_level=by_level, - readiness=readiness + readiness=readiness, ) @@ -134,7 +154,7 @@ async def get_compliance_dashboard(db: AsyncSession = Depends(get_db)): "/sprs", response_model=SPRSResult, summary="Calculate SPRS Score", - description="Calculate the DoD Supplier Performance Risk System (SPRS) score based on current control implementation status. Score ranges from -203 to 110." + description="Calculate the DoD Supplier Performance Risk System (SPRS) score based on current control implementation status. Score ranges from -203 to 110.", ) async def calculate_sprs_score(db: AsyncSession = Depends(get_db)): result = await db.execute(select(ControlRecord)) @@ -153,7 +173,12 @@ async def calculate_sprs_score(db: AsyncSession = Depends(get_db)): if status == "implemented": implemented_count += 1 - elif status in ["not_implemented", "not_started", "partially_implemented", "partial"]: + elif status in [ + "not_implemented", + "not_started", + "partially_implemented", + "partial", + ]: not_implemented_count += 1 deduction = SPRS_DEDUCTIONS.get(cid, 1) sprs -= deduction @@ -176,10 +201,13 @@ async def calculate_sprs_score(db: AsyncSession = Depends(get_db)): controls_not_implemented=not_implemented_count, deductions=deductions_list, certification_level=cert_level, - assessment_date=datetime.now(UTC).date().isoformat() + assessment_date=datetime.now(UTC).date().isoformat(), ) -@router.post("/promote/{run_id}", summary="Promote agent findings to official assessment records") + +@router.post( + "/promote/{run_id}", summary="Promote agent findings to official assessment records" +) async def promote_agent_run(run_id: str, db: AsyncSession = Depends(get_db)): """Convert an agent execution run into official assessment records.""" query = select(AgentRunRecord).where(AgentRunRecord.id == run_id) @@ -205,7 +233,12 @@ async def promote_agent_run(run_id: str, db: AsyncSession = Depends(get_db)): evidence_ids=[res["evidence_id"]], assessor=f"Agent: {run.agent_type}", assessment_date=datetime.now(UTC), - poam_required="true" if res["status"] in ["partial", "not_implemented", "partially_implemented"] else "false" + poam_required=( + "true" + if res["status"] + in ["partial", "not_implemented", "partially_implemented"] + else "false" + ), ) db.add(new_ass) promoted_count += 1 @@ -228,10 +261,18 @@ async def promote_agent_run(run_id: str, db: AsyncSession = Depends(get_db)): evidence_ids=[findings.get("image_scan", {}).get("evidence_id")], assessor=f"Agent: {run.agent_type}", assessment_date=datetime.now(UTC), - poam_required="true" if status in ["partial", "not_implemented", "partially_implemented"] else "false" + poam_required=( + "true" + if status in ["partial", "not_implemented", "partially_implemented"] + else "false" + ), ) db.add(new_ass) promoted_count += 1 await db.commit() - return {"status": "promoted", "run_id": run_id, "assessments_created": promoted_count} + return { + "status": "promoted", + "run_id": run_id, + "assessments_created": promoted_count, + } diff --git a/backend/routers/controls.py b/backend/routers/controls.py index 76b9ad0..92a7be5 100644 --- a/backend/routers/controls.py +++ b/backend/routers/controls.py @@ -2,30 +2,39 @@ CMMC Controls Router - FastAPI endpoints for control management. These endpoints are automatically exposed as MCP tools via fastapi-mcp. """ -from fastapi import APIRouter, HTTPException, Query, Depends -from typing import Optional, List + +from typing import List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, func -from backend.db.database import get_db, ControlRecord, AssessmentRecord, get_latest_assessments -from backend.models.control import ( - Control, ControlResponse, ControlListResponse, - ControlUpdate, CMMCLevel, ControlDomain, ImplementationStatus -) +from backend.db.database import (AssessmentRecord, ControlRecord, get_db, + get_latest_assessments) +from backend.models.control import (CMMCLevel, Control, ControlDomain, + ControlListResponse, ControlResponse, + ControlUpdate, ImplementationStatus) router = APIRouter() + @router.get( "/", response_model=ControlListResponse, summary="List CMMC Controls", - description="List all CMMC controls, optionally filtered by level or domain. Returns controls with current implementation status." + description="List all CMMC controls, optionally filtered by level or domain. Returns controls with current implementation status.", ) async def list_controls( - level: Optional[CMMCLevel] = Query(None, description="Filter by CMMC level (Level 1, Level 2, Level 3)"), - domain: Optional[ControlDomain] = Query(None, description="Filter by control domain (AC, AU, CM, etc.)"), - status: Optional[ImplementationStatus] = Query(None, description="Filter by implementation status"), - db: AsyncSession = Depends(get_db) + level: Optional[CMMCLevel] = Query( + None, description="Filter by CMMC level (Level 1, Level 2, Level 3)" + ), + domain: Optional[ControlDomain] = Query( + None, description="Filter by control domain (AC, AU, CM, etc.)" + ), + status: Optional[ImplementationStatus] = Query( + None, description="Filter by implementation status" + ), + db: AsyncSession = Depends(get_db), ): """ Optimized list_controls using single query JOIN for performance. @@ -34,7 +43,7 @@ async def list_controls( sub_q = ( select( AssessmentRecord.control_id, - func.max(AssessmentRecord.assessment_date).label("max_date") + func.max(AssessmentRecord.assessment_date).label("max_date"), ) .group_by(AssessmentRecord.control_id) .subquery() @@ -43,14 +52,11 @@ async def list_controls( # 2. Main query joining Controls with latest Assessments query = ( select(ControlRecord, AssessmentRecord) - .outerjoin( - sub_q, - ControlRecord.id == sub_q.c.control_id - ) + .outerjoin(sub_q, ControlRecord.id == sub_q.c.control_id) .outerjoin( AssessmentRecord, - (AssessmentRecord.control_id == sub_q.c.control_id) & - (AssessmentRecord.assessment_date == sub_q.c.max_date) + (AssessmentRecord.control_id == sub_q.c.control_id) + & (AssessmentRecord.assessment_date == sub_q.c.max_date), ) ) @@ -78,25 +84,32 @@ async def list_controls( domain=c.domain, level=c.level, nist_mapping=c.nist_mapping, - weight=c.score_value + weight=c.score_value, ), implementation_status=a.status if a else "not_started", - evidence_count=len(a.evidence_ids) if a and isinstance(a.evidence_ids, list) else 0, + evidence_count=( + len(a.evidence_ids) if a and isinstance(a.evidence_ids, list) else 0 + ), notes=a.notes if a else None, confidence=a.confidence if a else 0.0, - poam_required=(a.poam_required == "true") if a else False + poam_required=(a.poam_required == "true") if a else False, ) for c, a in rows ] - return ControlListResponse(controls=responses, total=len(responses), level_filter=level, domain_filter=domain) + return ControlListResponse( + controls=responses, + total=len(responses), + level_filter=level, + domain_filter=domain, + ) @router.get( "/{control_id}", response_model=ControlResponse, summary="Get Control Detail", - description="Get full details of a specific CMMC control by its ID (e.g., AC.1.001)." + description="Get full details of a specific CMMC control by its ID (e.g., AC.1.001).", ) async def get_control_detail(control_id: str, db: AsyncSession = Depends(get_db)): query = select(ControlRecord).where(ControlRecord.id == control_id) @@ -106,7 +119,11 @@ async def get_control_detail(control_id: str, db: AsyncSession = Depends(get_db) if not c: raise HTTPException(status_code=404, detail=f"Control {control_id} not found") - a_query = select(AssessmentRecord).where(AssessmentRecord.control_id == control_id).order_by(AssessmentRecord.assessment_date.desc()) + a_query = ( + select(AssessmentRecord) + .where(AssessmentRecord.control_id == control_id) + .order_by(AssessmentRecord.assessment_date.desc()) + ) a_result = await db.execute(a_query) assessment = a_result.scalars().first() @@ -118,13 +135,17 @@ async def get_control_detail(control_id: str, db: AsyncSession = Depends(get_db) domain=c.domain, level=c.level, nist_mapping=c.nist_mapping, - weight=c.score_value + weight=c.score_value, ), implementation_status=assessment.status if assessment else "not_started", - evidence_count=len(assessment.evidence_ids) if assessment and isinstance(assessment.evidence_ids, list) else 0, + evidence_count=( + len(assessment.evidence_ids) + if assessment and isinstance(assessment.evidence_ids, list) + else 0 + ), notes=assessment.notes if assessment else None, confidence=assessment.confidence if assessment else 0.0, - poam_required=(assessment.poam_required == "true") if assessment else False + poam_required=(assessment.poam_required == "true") if assessment else False, ) @@ -132,9 +153,11 @@ async def get_control_detail(control_id: str, db: AsyncSession = Depends(get_db) "/{control_id}", response_model=ControlResponse, summary="Update Control Assessment Status", - description="Update the implementation status, notes, and responsible party for a CMMC control." + description="Update the implementation status, notes, and responsible party for a CMMC control.", ) -async def update_control_status(control_id: str, update: ControlUpdate, db: AsyncSession = Depends(get_db)): +async def update_control_status( + control_id: str, update: ControlUpdate, db: AsyncSession = Depends(get_db) +): query = select(ControlRecord).where(ControlRecord.id == control_id) result = await db.execute(query) c = result.scalar_one_or_none() @@ -143,7 +166,7 @@ async def update_control_status(control_id: str, update: ControlUpdate, db: Asyn raise HTTPException(status_code=404, detail=f"Control {control_id} not found") import uuid - from datetime import datetime, UTC + from datetime import UTC, datetime new_assessment = AssessmentRecord( id=str(uuid.uuid4()), @@ -155,7 +178,7 @@ async def update_control_status(control_id: str, update: ControlUpdate, db: Asyn assessment_date=datetime.now(UTC), evidence_ids=update.evidence_ids or [], confidence=update.confidence or 0.0, - poam_required="true" if update.poam_required else "false" + poam_required="true" if update.poam_required else "false", ) db.add(new_assessment) await db.commit() @@ -168,12 +191,12 @@ async def update_control_status(control_id: str, update: ControlUpdate, db: Asyn domain=c.domain, level=c.level, nist_mapping=c.nist_mapping, - weight=c.score_value + weight=c.score_value, ), implementation_status=update.implementation_status, notes=update.notes, confidence=new_assessment.confidence, - poam_required=update.poam_required + poam_required=update.poam_required, ) @@ -181,7 +204,9 @@ async def update_control_status(control_id: str, update: ControlUpdate, db: Asyn "/domain/{domain}", response_model=ControlListResponse, summary="Get Controls by Domain", - description="Get all CMMC controls for a specific domain (e.g., AC for Access Control)." + description="Get all CMMC controls for a specific domain (e.g., AC for Access Control).", ) -async def get_controls_by_domain(domain: ControlDomain, db: AsyncSession = Depends(get_db)): +async def get_controls_by_domain( + domain: ControlDomain, db: AsyncSession = Depends(get_db) +): return await list_controls(domain=domain, db=db) diff --git a/backend/routers/evidence.py b/backend/routers/evidence.py index c193263..c946fe4 100644 --- a/backend/routers/evidence.py +++ b/backend/routers/evidence.py @@ -6,21 +6,27 @@ Evidence types: log, scan, policy, diagram, screenshot, report, configuration. All evidence records include ZT pillar, capability ID, and control mappings. """ + import uuid -from fastapi import APIRouter, HTTPException, Depends +from datetime import UTC, datetime from typing import List, Optional -from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select -from datetime import datetime, UTC +from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import get_db, EvidenceRecord -from backend.models.evidence import EvidenceCreate, EvidenceResponse, EvidenceListResponse +from backend.db.database import EvidenceRecord, get_db +from backend.models.evidence import (EvidenceCreate, EvidenceListResponse, + EvidenceResponse) router = APIRouter() -@router.post("/", response_model=EvidenceResponse, - summary="Upload evidence artifact for a CMMC control") +@router.post( + "/", + response_model=EvidenceResponse, + summary="Upload evidence artifact for a CMMC control", +) async def create_evidence( evidence: EvidenceCreate, db: AsyncSession = Depends(get_db), @@ -63,8 +69,9 @@ async def create_evidence( ) -@router.get("/", response_model=EvidenceListResponse, - summary="List all evidence artifacts") +@router.get( + "/", response_model=EvidenceListResponse, summary="List all evidence artifacts" +) async def list_evidence( control_id: Optional[str] = None, zt_pillar: Optional[str] = None, @@ -102,8 +109,11 @@ async def list_evidence( return EvidenceListResponse(total=len(items), evidence=items) -@router.get("/{evidence_id}", response_model=EvidenceResponse, - summary="Get a specific evidence artifact") +@router.get( + "/{evidence_id}", + response_model=EvidenceResponse, + summary="Get a specific evidence artifact", +) async def get_evidence( evidence_id: str, db: AsyncSession = Depends(get_db), diff --git a/backend/routers/reports.py b/backend/routers/reports.py index 6549f98..0d01da7 100644 --- a/backend/routers/reports.py +++ b/backend/routers/reports.py @@ -5,20 +5,25 @@ Generates System Security Plans (SSP) and Plans of Action & Milestones (POA&M) from the current assessment state. Output formats: Markdown, JSON, CSV. """ -from fastapi import APIRouter, Depends -from fastapi.responses import PlainTextResponse -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, func -from typing import List, Dict, Any -from datetime import datetime, date, UTC + import csv import io import json +from datetime import UTC, date, datetime +from typing import Any, Dict, List + +from fastapi import APIRouter, Depends +from fastapi.responses import PlainTextResponse +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import get_db, AssessmentRecord, ControlRecord, EvidenceRecord, get_latest_assessments +from backend.db.database import (AssessmentRecord, ControlRecord, + EvidenceRecord, get_db, + get_latest_assessments) router = APIRouter() + @router.get("/ssp", summary="Generate System Security Plan (SSP) in Markdown") async def generate_ssp( system_name: str = "AGI Corp CMMC System", @@ -36,14 +41,22 @@ async def generate_ssp( controls = {c.id: c for c in controls_result.scalars().all()} # Count by status - status_counts = {"implemented": 0, "partial": 0, "planned": 0, "not_implemented": 0, "na": 0} + status_counts = { + "implemented": 0, + "partial": 0, + "planned": 0, + "not_implemented": 0, + "na": 0, + } for a in assessments: if a.status in status_counts: status_counts[a.status] += 1 elif a.status == "partially_implemented": - status_counts["partial"] += 1 + status_counts["partial"] += 1 - sprs_estimate = 110 - (status_counts["not_implemented"] * 1 + status_counts["partial"] * 0.5) + sprs_estimate = 110 - ( + status_counts["not_implemented"] * 1 + status_counts["partial"] * 0.5 + ) sprs_estimate = max(-203, round(sprs_estimate, 0)) ssp = f"""# System Security Plan (SSP) @@ -130,35 +143,54 @@ async def generate_poam( output = io.StringIO() writer = csv.writer(output) - writer.writerow([ - "Control ID", "Domain", "Title", "ZT Pillar", "Status", - "Confidence", "Milestone", "Target Date", "Responsible Party", - "Resources Required", "Notes" - ]) + writer.writerow( + [ + "Control ID", + "Domain", + "Title", + "ZT Pillar", + "Status", + "Confidence", + "Milestone", + "Target Date", + "Responsible Party", + "Resources Required", + "Notes", + ] + ) for a in assessments: - if a.status in ["not_implemented", "partial", "planned", "partially_implemented"]: + if a.status in [ + "not_implemented", + "partial", + "planned", + "partially_implemented", + ]: ctrl = controls.get(a.control_id) domain = a.control_id.split(".")[0] if "." in a.control_id else "" - writer.writerow([ - a.control_id, - domain, - ctrl.title if ctrl else "", - ctrl.zt_pillar if ctrl else "", - a.status, - f"{a.confidence:.0%}", - f"Implement {a.control_id}", - a.next_review.strftime("%Y-%m-%d") if a.next_review else "TBD", - a.assessor or "ISSO", - "TBD", - a.notes or "", - ]) + writer.writerow( + [ + a.control_id, + domain, + ctrl.title if ctrl else "", + ctrl.zt_pillar if ctrl else "", + a.status, + f"{a.confidence:.0%}", + f"Implement {a.control_id}", + a.next_review.strftime("%Y-%m-%d") if a.next_review else "TBD", + a.assessor or "ISSO", + "TBD", + a.notes or "", + ] + ) csv_content = output.getvalue() return PlainTextResponse( content=csv_content, media_type="text/csv", - headers={"Content-Disposition": f'attachment; filename="poam_{system_name.replace(" ","_")}.csv"'}, + headers={ + "Content-Disposition": f'attachment; filename="poam_{system_name.replace(" ","_")}.csv"' + }, ) @@ -170,17 +202,28 @@ async def get_dashboard( assessments_map = await get_latest_assessments(db) assessments = list(assessments_map.values()) - status_counts = {"implemented": 0, "partial": 0, "planned": 0, "not_implemented": 0, "na": 0} + status_counts = { + "implemented": 0, + "partial": 0, + "planned": 0, + "not_implemented": 0, + "na": 0, + } for a in assessments: if a.status in status_counts: status_counts[a.status] += 1 elif a.status == "partially_implemented": - status_counts["partial"] += 1 + status_counts["partial"] += 1 total_assessed = len(assessments) implemented = status_counts["implemented"] - sprs_score = max(-203, round(110 - (status_counts["not_implemented"] + status_counts["partial"] * 0.5))) + sprs_score = max( + -203, + round( + 110 - (status_counts["not_implemented"] + status_counts["partial"] * 0.5) + ), + ) # Get total controls count for accurate percentage controls_result = await db.execute(select(func.count(ControlRecord.id))) @@ -193,7 +236,9 @@ async def get_dashboard( "total_controls": total_controls, "assessed_controls": total_assessed, "status_breakdown": status_counts, - "overall_compliance_pct": round(implemented / total_controls * 100, 1) if total_controls else 0, + "overall_compliance_pct": ( + round(implemented / total_controls * 100, 1) if total_controls else 0 + ), "zt_pillars": [ {"pillar": "User", "domains": ["AC", "IA", "PS"]}, {"pillar": "Device", "domains": ["CM", "MA", "PE"]}, diff --git a/benchmark_controls.py b/benchmark_controls.py index 907ef8b..c64b298 100644 --- a/benchmark_controls.py +++ b/benchmark_controls.py @@ -1,19 +1,20 @@ - import asyncio -import time import os -import uuid import statistics -from datetime import datetime, timedelta, UTC -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession +import time +import uuid +from datetime import UTC, datetime, timedelta + +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker -from httpx import AsyncClient, ASGITransport # Set up environment for benchmark os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./benchmark.db" +from backend.db.database import AssessmentRecord, Base, ControlRecord, init_db from backend.main import app -from backend.db.database import Base, ControlRecord, AssessmentRecord, init_db + async def seed_benchmark_data(): engine = create_async_engine("sqlite+aiosqlite:///./benchmark.db") @@ -21,7 +22,9 @@ async def seed_benchmark_data(): await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.create_all) - AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + AsyncSessionLocal = sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False + ) async with AsyncSessionLocal() as session: # 1. Seed 110 controls (Standard CMMC Level 2) @@ -34,7 +37,7 @@ async def seed_benchmark_data(): level="Level 2", title=f"Benchmark Control {i+1}", description=f"Description for control {i+1}", - score_value=1 if i % 5 != 0 else 5 + score_value=1 if i % 5 != 0 else 5, ) controls.append(ctrl) session.add(ctrl) @@ -47,48 +50,55 @@ async def seed_benchmark_data(): id=str(uuid.uuid4()), control_id=ctrl.id, status="implemented" if j == 9 else "not_implemented", - assessment_date=datetime.now(UTC) - timedelta(days=10-j), + assessment_date=datetime.now(UTC) - timedelta(days=10 - j), confidence=0.9, notes=f"Assessment history {j}", - evidence_ids=["ev-1", "ev-2"] + evidence_ids=["ev-1", "ev-2"], ) session.add(assessment) await session.commit() await engine.dispose() + async def run_benchmark(): print("Seeding benchmark data...") await seed_benchmark_data() print("Running benchmark on /api/controls/ ...") - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: # 1. No filter print("\nTest 1: No filter") latencies = [] - await ac.get("/api/controls/") # Warmup + await ac.get("/api/controls/") # Warmup for _ in range(50): start_time = time.perf_counter() response = await ac.get("/api/controls/") end_time = time.perf_counter() latencies.append((end_time - start_time) * 1000) - print(f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms") + print( + f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms" + ) # 2. Status filter (should be faster with SQL filtering) print("\nTest 2: Status filter (implemented)") latencies = [] - await ac.get("/api/controls/?status=implemented") # Warmup + await ac.get("/api/controls/?status=implemented") # Warmup for _ in range(50): start_time = time.perf_counter() response = await ac.get("/api/controls/?status=implemented") end_time = time.perf_counter() latencies.append((end_time - start_time) * 1000) - print(f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms") + print( + f"Avg: {statistics.mean(latencies):.2f} ms, P95: {statistics.quantiles(latencies, n=20)[18]:.2f} ms" + ) avg_latency = statistics.mean(latencies) median_latency = statistics.median(latencies) - p95_latency = statistics.quantiles(latencies, n=20)[18] # 19th 5-percentile is p95 + p95_latency = statistics.quantiles(latencies, n=20)[18] # 19th 5-percentile is p95 print(f"\nResults (50 requests):") print(f"Average Latency: {avg_latency:.2f} ms") @@ -99,5 +109,6 @@ async def run_benchmark(): if os.path.exists("./benchmark.db"): os.remove("./benchmark.db") + if __name__ == "__main__": asyncio.run(run_benchmark()) diff --git a/tests/test_agents.py b/tests/test_agents.py index 5815263..382c076 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -1,15 +1,18 @@ +import json +import os import pytest -import os -import json -from httpx import AsyncClient, ASGITransport +from httpx import ASGITransport, AsyncClient + +from backend.db.database import AsyncSessionLocal, Base, engine, init_db from backend.main import app -from backend.db.database import init_db, engine, Base, AsyncSessionLocal + @pytest.fixture(scope="session") def anyio_backend(): return "asyncio" + @pytest.fixture(scope="session", autouse=True) async def setup_db(): os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./test_agents.db" @@ -20,41 +23,53 @@ async def setup_db(): if os.path.exists("./test_agents.db"): os.remove("./test_agents.db") + @pytest.mark.anyio async def test_icam_assess(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: response = await ac.get("/api/agents/icam/assess") assert response.status_code == 200 data = response.json() assert data["agent"] == "icam" assert len(data["assessments"]) > 0 + @pytest.mark.anyio async def test_devsecops_assess(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: response = await ac.get("/api/agents/devsecops/assess/test-service") assert response.status_code == 200 data = response.json() assert data["agent"] == "devsecops" assert "image_scan" in data + @pytest.mark.anyio async def test_orchestrator_scorecard(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: response = await ac.get("/api/orchestrator/scorecard") assert response.status_code == 200 data = response.json() assert "scorecard" in data assert "sprs" in data + @pytest.mark.anyio async def test_mistral_gap_analysis_mock(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: req = { "control_id": "AC.1.001", "control_title": "Limit access", "control_description": "Desc", - "zt_pillar": "User" + "zt_pillar": "User", } response = await ac.post("/api/agents/mistral/gap-analysis", json=req) assert response.status_code == 200 @@ -63,18 +78,25 @@ async def test_mistral_gap_analysis_mock(): if not os.getenv("MISTRAL_API_KEY"): assert "Mock analysis" in data["analysis"]["gap_summary"] + @pytest.mark.anyio async def test_agent_run_promotion(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: # 1. Trigger an agent run (ICAM) assess_resp = await ac.get("/api/agents/icam/assess") assert assess_resp.status_code == 200 # 2. Get the run ID from the database - from backend.db.database import AgentRunRecord from sqlalchemy import select + + from backend.db.database import AgentRunRecord + async with AsyncSessionLocal() as session: - res = await session.execute(select(AgentRunRecord).order_by(AgentRunRecord.created_at.desc())) + res = await session.execute( + select(AgentRunRecord).order_by(AgentRunRecord.created_at.desc()) + ) run = res.scalars().first() run_id = run.id diff --git a/tests/test_backend.py b/tests/test_backend.py index a566eb3..fc40807 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,15 +1,18 @@ +import asyncio +import os import pytest -import os -import asyncio -from httpx import AsyncClient, ASGITransport +from httpx import ASGITransport, AsyncClient + +from backend.db.database import Base, engine, init_db from backend.main import app -from backend.db.database import init_db, engine, Base + @pytest.fixture(scope="session") def anyio_backend(): return "asyncio" + @pytest.fixture(scope="session", autouse=True) async def setup_db(): # Use a separate test database @@ -22,16 +25,22 @@ async def setup_db(): if os.path.exists("./test_api.db"): os.remove("./test_api.db") + @pytest.mark.anyio async def test_health_check(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: response = await ac.get("/health") assert response.status_code == 200 assert response.json() == {"status": "ok"} + @pytest.mark.anyio async def test_list_controls(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: response = await ac.get("/api/controls/") assert response.status_code == 200 data = response.json() @@ -41,9 +50,12 @@ async def test_list_controls(): control_ids = [c["control"]["id"] for c in data["controls"]] assert "AC.1.001" in control_ids + @pytest.mark.anyio async def test_update_and_score(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: # 1. Check SPRS score initial response = await ac.get("/api/assessment/sprs") data = response.json() @@ -53,7 +65,7 @@ async def test_update_and_score(): update_data = { "implementation_status": "implemented", "notes": "Test fix", - "responsible_party": "Tester" + "responsible_party": "Tester", } patch_response = await ac.patch("/api/controls/AC.1.001", json=update_data) assert patch_response.status_code == 200 @@ -64,9 +76,12 @@ async def test_update_and_score(): # AC.1.001 has deduction 3. assert data["sprs_score"] == initial_score + 3 + @pytest.mark.anyio async def test_reports(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: # 1. Check dashboard resp = await ac.get("/api/reports/dashboard") assert resp.status_code == 200 @@ -81,16 +96,19 @@ async def test_reports(): assert resp.status_code == 200 assert "Control ID,Domain" in resp.text + @pytest.mark.anyio async def test_update_with_advanced_fields(): - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: update_data = { "implementation_status": "implemented", "notes": "Advanced update", "responsible_party": "Tester", "confidence": 0.95, "poam_required": False, - "evidence_ids": ["ev-123"] + "evidence_ids": ["ev-123"], } patch_response = await ac.patch("/api/controls/AC.1.002", json=update_data) assert patch_response.status_code == 200 From a2393f672fad200828f4507db7b15bceeaaa05bf Mon Sep 17 00:00:00 2001 From: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> Date: Sun, 15 Mar 2026 08:26:41 +0000 Subject: [PATCH 3/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20performance?= =?UTF-8?q?=20and=20fix=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance: - Add composite index `idx_control_date` to `AssessmentRecord`. - Implement optimized `get_latest_assessments` in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from routers. - Add `benchmark_controls.py`. CI Fix: - Add `.github/workflows/python-ci.yml` using standard pip and pytest. - (Note: platform billing issue still present, but workflow is now correctly configured). Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> --- .github/workflows/python-ci.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/python-ci.yml diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml new file mode 100644 index 0000000..944744e --- /dev/null +++ b/.github/workflows/python-ci.yml @@ -0,0 +1,24 @@ +name: Python CI + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: '3.12' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Run tests + run: | + python3 -m pytest tests/test_backend.py From a0c9f1a4a6e6e17121a480695b414fd75a73550d Mon Sep 17 00:00:00 2001 From: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> Date: Sun, 15 Mar 2026 08:29:12 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20performance?= =?UTF-8?q?=20and=20clean=20up=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance: - Add composite index `idx_control_date` to `AssessmentRecord`. - Implement optimized `get_latest_assessments` in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from routers. - Add `benchmark_controls.py`. CI: - Add `.github/workflows/python-ci.yml` (pip/pytest based). - Remove broken `python-package-conda.yml` and `swift.yml`. - (Note: platform billing issue still present, but workflow is now correctly configured). Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> --- .github/workflows/python-package-conda.yml | 34 ---------------------- .github/workflows/swift.yml | 22 -------------- 2 files changed, 56 deletions(-) delete mode 100644 .github/workflows/python-package-conda.yml delete mode 100644 .github/workflows/swift.yml diff --git a/.github/workflows/python-package-conda.yml b/.github/workflows/python-package-conda.yml deleted file mode 100644 index f358604..0000000 --- a/.github/workflows/python-package-conda.yml +++ /dev/null @@ -1,34 +0,0 @@ -name: Python Package using Conda - -on: [push] - -jobs: - build-linux: - runs-on: ubuntu-latest - strategy: - max-parallel: 5 - - steps: - - uses: actions/checkout@v4 - - name: Set up Python 3.10 - uses: actions/setup-python@v3 - with: - python-version: '3.10' - - name: Add conda to system path - run: | - # $CONDA is an environment variable pointing to the root of the miniconda directory - echo $CONDA/bin >> $GITHUB_PATH - - name: Install dependencies - run: | - conda env update --file environment.yml --name base - - name: Lint with flake8 - run: | - conda install flake8 - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Test with pytest - run: | - conda install pytest - pytest diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml deleted file mode 100644 index 21ae770..0000000 --- a/.github/workflows/swift.yml +++ /dev/null @@ -1,22 +0,0 @@ -# This workflow will build a Swift project -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-swift - -name: Swift - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - build: - - runs-on: macos-latest - - steps: - - uses: actions/checkout@v4 - - name: Build - run: swift build -v - - name: Run tests - run: swift test -v From 4f9c7bf2c37f4c048952bd94698f14383f8ff327 Mon Sep 17 00:00:00 2001 From: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> Date: Sun, 15 Mar 2026 08:31:30 +0000 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20performance?= =?UTF-8?q?=20and=20clean=20up=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance: - Add composite index `idx_control_date` to `AssessmentRecord`. - Implement optimized `get_latest_assessments` helper in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from routers. - Add `benchmark_controls.py`. CI: - Add `.github/workflows/python-ci.yml` (pip/pytest based). - Remove broken `python-package-conda.yml` and `swift.yml`. - (Note: platform billing issue still present, but workflow is now correctly configured). Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> From d61f8d3b6967d85f8fabfb7b5b2373e2b5c1af08 Mon Sep 17 00:00:00 2001 From: AGI-Corporation <186229839+AGI-Corporation@users.noreply.github.com> Date: Sun, 15 Mar 2026 08:35:05 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9A=A1=20Bolt:=20optimize=20performance?= =?UTF-8?q?=20and=20clean=20up=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance: - Add composite index `idx_control_date` to `AssessmentRecord`. - Implement optimized `get_latest_assessments` helper in `database.py`. - Refactor `list_controls` to use single SQL JOIN and DB-level filtering. - Remove redundant query logic from routers. - Add `benchmark_controls.py`. CI: - Add `.github/workflows/python-ci.yml` (pip/pytest based). - Remove broken `python-package-conda.yml` and `swift.yml`. - (Note: platform billing issue still present, but workflow is now correctly configured). Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> --- backend/routers/controls.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backend/routers/controls.py b/backend/routers/controls.py index 92a7be5..650b78c 100644 --- a/backend/routers/controls.py +++ b/backend/routers/controls.py @@ -9,8 +9,7 @@ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from backend.db.database import (AssessmentRecord, ControlRecord, get_db, - get_latest_assessments) +from backend.db.database import AssessmentRecord, ControlRecord, get_db from backend.models.control import (CMMCLevel, Control, ControlDomain, ControlListResponse, ControlResponse, ControlUpdate, ImplementationStatus)