Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
allow_headers=["*"],
)


@app.get("/api/v1/healthz")
def healthz():
return {"status": "ok", "env": settings.env}


@app.get("/api/v1/readyz")
def readyz():
return {"ready": True}


app.include_router(auth_router, prefix="/api/v1")
app.include_router(ai_router, prefix="/api/v1")

Expand All @@ -52,3 +55,7 @@ def readyz():
app.include_router(router, prefix="/api/v1")
except Exception as e: # pragma: no cover - optional deps
logging.warning("Router %s not loaded: %s", mod_path, e)

from .services.validation.scheduler import start_scheduler # noqa: E402

_scheduler = start_scheduler()
2 changes: 1 addition & 1 deletion backend/app/services/validation/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from app.core.config import settings
from app.db import models
from app.services.sigma_eval.engine import evaluate_local
from app.services.sigma_eval.engine import evaluate_local, ensure_elastic_index # noqa: F401
from app.services.validation.confidence import compute_confidence
from opentelemetry import trace

Expand Down
120 changes: 120 additions & 0 deletions backend/app/services/validation/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.orm import Session
from pathlib import Path
from datetime import datetime, timezone
import logging

from app.db.session import SessionLocal
from app.db import models
from app.core.config import settings
from app.services.validation.orchestrator import (
run_evaluate_local,
run_evaluate_elastic,
update_validation_status,
bulk_index_events,
ensure_elastic_index,
)

log = logging.getLogger(__name__)


def _events_path(uri: str) -> Path | None:
if uri.startswith("file://"):
return Path(uri.replace("file://", "")).resolve()
return None


def _select_rules(db: Session, sched: models.ValidationSchedule):
q = db.query(models.Rule)
if sched.rule_ids:
q = q.filter(models.Rule.id.in_(sched.rule_ids))
elif sched.techniques:
q = q.filter(models.Rule.attack_techniques.overlap(sched.techniques))
else:
q = q.filter(models.Rule.status == models.RuleStatus.active)
return q.all()


def _execute_schedule(db: Session, sched: models.ValidationSchedule):
run = models.AttackRun(
name=f"schedule:{sched.name}",
techniques=sched.techniques or [],
source=models.RunSource.local,
status=models.RunStatus.running,
started_at=datetime.now(timezone.utc),
)
db.add(run)
db.commit()
db.refresh(run)

rules = _select_rules(db, sched)
ev = _events_path(sched.dataset_uri)
if not ev or not ev.exists():
log.warning("dataset missing for schedule %s", sched.name)
run.status = models.RunStatus.failed
db.add(run)
db.commit()
return

if sched.engine == "elastic":
from elasticsearch import Elasticsearch

es = Elasticsearch(settings.elastic_url)
index_name = f"{settings.elastic_index_prefix}-{run.id}"
ensure_elastic_index(es, index_name)
bulk_index_events(es, index_name, ev)
res = run_evaluate_elastic(db, run, index_name, rules, es)
else:
res = run_evaluate_local(db, run, ev, rules)

run.status = models.RunStatus.completed
run.ended_at = datetime.now(timezone.utc)
db.add(run)
update_validation_status(db, run, res)
sched.last_run_at = run.ended_at
db.add(sched)
db.commit()


def start_scheduler():
if not getattr(settings, "scheduler_enabled", True):
return None
if not hasattr(models, "ValidationSchedule"):
log.info("ValidationSchedule model missing; scheduler disabled")
return None
sched = BackgroundScheduler(timezone=getattr(settings, "scheduler_timezone", "UTC"))

def _load_jobs():
db = SessionLocal()
try:
for s in (
db.query(models.ValidationSchedule)
.filter(models.ValidationSchedule.enabled == True) # noqa: E712
.all()
):
trig = CronTrigger.from_crontab(s.cron)
sched.add_job(
lambda sid=s.id: _run_job(sid),
trig,
id=str(s.id),
replace_existing=True,
max_instances=1,
)
finally:
db.close()

def _run_job(sid):
db = SessionLocal()
try:
row = db.get(models.ValidationSchedule, sid)
if row and row.enabled:
_execute_schedule(db, row)
finally:
db.close()

_load_jobs()
sched.start()
return sched
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"opentelemetry-instrumentation-requests>=0.46b0",
"opentelemetry-exporter-otlp>=1.25.0",
"structlog>=24.1.0",
"APScheduler>=3.10",
"pytest>=8.2",
"pytest-cov>=5.0",
]
Expand Down
Loading