Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Python CI

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.12
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests
run: |
python3 -m pytest tests/test_backend.py
Comment on lines +22 to +24
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set DATABASE_URL explicitly for CI test isolation.

Line 22–24 runs tests without defining DATABASE_URL. Given the PR’s test setup relies on it, CI may fail or connect to an unintended default database. Please set it on the test step.

✅ Suggested patch
     - name: Run tests
+      env:
+        DATABASE_URL: sqlite:///./test.db
       run: |
         python3 -m pytest tests/test_backend.py
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- name: Run tests
run: |
python3 -m pytest tests/test_backend.py
- name: Run tests
env:
DATABASE_URL: sqlite:///./test.db
run: |
python3 -m pytest tests/test_backend.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/python-ci.yml around lines 22 - 24, The "Run tests" CI
step currently executes pytest without setting DATABASE_URL, which can cause
tests to hit an unintended DB; update the "Run tests" step to define
DATABASE_URL explicitly (e.g. via an env key or step env) so tests run in
isolation; modify the step named "Run tests" to export DATABASE_URL (for example
to a CI-safe value like a test sqlite URL or a temporary test DB connection
string) before running python3 -m pytest tests/test_backend.py.

34 changes: 0 additions & 34 deletions .github/workflows/python-package-conda.yml

This file was deleted.

22 changes: 0 additions & 22 deletions .github/workflows/swift.yml

This file was deleted.

7 changes: 7 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## 2025-03-04 - [Database-Level Filtering for Implementation Status]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix changelog dates for traceability.

Line 1 and Line 5 use 2025-03-04, but this PR is dated March 4, 2026. Please update the headings so history aligns with the actual change date.

Also applies to: 5-5

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.jules/bolt.md at line 1, Update the changelog headings that currently read
"2025-03-04" to the correct date "2026-03-04"; specifically replace the heading
string "## 2025-03-04 - [Database-Level Filtering for Implementation Status]"
and any other occurrences of "2025-03-04" in this file (including the second
occurrence at line 5) with "2026-03-04" so the changelog dates align with the PR
date.

**Learning:** In this architecture, 'not_started' status is represented by the absence of an `AssessmentRecord`. Python-side filtering for this status after fetching all controls is an anti-pattern that leads to O(N) database queries or massive over-fetching.
**Action:** Use a `LEFT OUTER JOIN` with a subquery for the latest assessment and explicitly check `AssessmentRecord.id.is_(None)` in the SQL `WHERE` clause to filter for 'not_started' controls at the database level.

## 2025-03-04 - [Optimizing Latest-State Queries]
**Learning:** Fetching the "latest state" for 110+ controls (Standard CMMC L2) is the most frequent and expensive operation. A composite index on `(control_id, assessment_date)` is critical for the performance of the subquery join used to resolve the latest assessment.
**Action:** Always ensure `AssessmentRecord` has a composite index on `control_id` and `assessment_date` when implementing framework-wide status lookups.
100 changes: 76 additions & 24 deletions agents/devsecops_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
Aligns with CMMC CM/SI domains, Fulcrum LOE 2, Cloud Security Playbook Play 19-21.
Responsibilities: container scanning, SBOM generation, pipeline gate evaluation.
"""

import uuid
from datetime import datetime, UTC
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any, Dict, List, Optional

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from backend.db.database import get_db, AgentRunRecord

from backend.db.database import AgentRunRecord, get_db


class SeverityLevel(str, Enum):
Expand All @@ -33,20 +36,35 @@ class DevSecOpsAgent:
def __init__(self, mock_mode: bool = True):
self.mock_mode = mock_mode

def scan_container_image(self, image_name: str, image_tag: str = "latest",
base_image: Optional[str] = None) -> Dict[str, Any]:
def scan_container_image(
self,
image_name: str,
image_tag: str = "latest",
base_image: Optional[str] = None,
) -> Dict[str, Any]:
"""Simulate CVE scan. Production: Trivy/Grype. Maps to NIST 800-190, CMMC SI.1.210."""
cves = [
{"cve_id": "CVE-2024-1234", "severity": "high", "package": "openssl",
"version": "3.0.1", "fixed_in": "3.0.2", "cmmc_controls": ["SI.1.210", "SI.2.214"]}
] if "vulnerable" in image_name else []
cves = (
[
{
"cve_id": "CVE-2024-1234",
"severity": "high",
"package": "openssl",
"version": "3.0.1",
"fixed_in": "3.0.2",
"cmmc_controls": ["SI.1.210", "SI.2.214"],
}
]
if "vulnerable" in image_name
else []
)
return {
"image": f"{image_name}:{image_tag}",
"base_image": base_image,
"base_image_approved": base_image in self.APPROVED_BASE_IMAGES,
"overall_risk": "pass" if not cves else "high",
"cve_findings": cves,
"critical_count": 0, "high_count": len(cves),
"critical_count": 0,
"high_count": len(cves),
"evidence_id": str(uuid.uuid4()),
"scanned_at": datetime.now(UTC).isoformat(),
"cmmc_controls": ["SI.1.210", "SI.2.214", "CM.2.061", "CM.3.068"],
Expand All @@ -56,14 +74,34 @@ def scan_container_image(self, image_name: str, image_tag: str = "latest",
def generate_sbom(self, service_name: str) -> Dict[str, Any]:
"""Generate CycloneDX SBOM. Maps to EO 14028, CMMC CM.2.062."""
components = [
{"name": "fastapi", "version": "0.111.0", "license": "MIT",
"supplier": "tiangolo", "cves": 0},
{"name": "mistralai", "version": "0.4.2", "license": "Apache-2.0",
"supplier": "Mistral AI", "cves": 0},
{"name": "sqlalchemy", "version": "2.0.30", "license": "MIT",
"supplier": "SQLAlchemy", "cves": 0},
{"name": "pydantic", "version": "2.7.0", "license": "MIT",
"supplier": "pydantic", "cves": 0},
{
"name": "fastapi",
"version": "0.111.0",
"license": "MIT",
"supplier": "tiangolo",
"cves": 0,
},
{
"name": "mistralai",
"version": "0.4.2",
"license": "Apache-2.0",
"supplier": "Mistral AI",
"cves": 0,
},
{
"name": "sqlalchemy",
"version": "2.0.30",
"license": "MIT",
"supplier": "SQLAlchemy",
"cves": 0,
},
{
"name": "pydantic",
"version": "2.7.0",
"license": "MIT",
"supplier": "pydantic",
"cves": 0,
},
]
return {
"sbom_format": "CycloneDX-1.5",
Expand All @@ -80,8 +118,12 @@ def generate_sbom(self, service_name: str) -> Dict[str, Any]:
def evaluate_pipeline_gates(self, pipeline_id: str) -> Dict[str, Any]:
"""Evaluate CI/CD security gates. Maps to Cloud Security Playbook Play 20."""
gate_results = {
"sast": "pass", "secret_scan": "pass", "dependency_check": "warn",
"container_scan": "pass", "sbom_generation": "pass", "image_signing": "fail",
"sast": "pass",
"secret_scan": "pass",
"dependency_check": "warn",
"container_scan": "pass",
"sbom_generation": "pass",
"image_signing": "fail",
}
failed = [g for g, s in gate_results.items() if s == "fail"]
warned = [g for g, s in gate_results.items() if s == "warn"]
Expand All @@ -102,7 +144,9 @@ def evaluate_pipeline_gates(self, pipeline_id: str) -> Dict[str, Any]:
"evaluated_at": datetime.now(UTC).isoformat(),
}

async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc-api", trigger: str = "manual") -> Dict[str, Any]:
async def run_full_assessment(
self, db: AsyncSession, service_name: str = "cmmc-api", trigger: str = "manual"
) -> Dict[str, Any]:
"""Run complete DevSecOps assessment pipeline."""
image_scan = self.scan_container_image(service_name)
sbom = self.generate_sbom(service_name)
Expand Down Expand Up @@ -131,11 +175,17 @@ async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc-
agent_type="devsecops",
trigger=trigger,
scope=service_name,
controls_evaluated=list(set(image_scan["cmmc_controls"] + sbom["cmmc_controls"] + pipeline["cmmc_controls"])),
controls_evaluated=list(
set(
image_scan["cmmc_controls"]
+ sbom["cmmc_controls"]
+ pipeline["cmmc_controls"]
)
),
findings=result,
status="completed",
created_at=datetime.now(UTC),
completed_at=datetime.now(UTC)
completed_at=datetime.now(UTC),
)
db.add(record)
await db.commit()
Expand All @@ -147,7 +197,9 @@ async def run_full_assessment(self, db: AsyncSession, service_name: str = "cmmc-
_dso = DevSecOpsAgent()


@router.get("/assess/{service_name}", summary="Run full DevSecOps ZT Application assessment")
@router.get(
"/assess/{service_name}", summary="Run full DevSecOps ZT Application assessment"
)
async def assess_service(service_name: str, db: AsyncSession = Depends(get_db)):
"""Container scan + SBOM + pipeline gates - ZT Application Pillar evidence."""
return await _dso.run_full_assessment(db, service_name)
Expand Down
Loading
Loading