Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions backend/app/api/v1/builder.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
from fastapi import APIRouter, HTTPException
from app.services.builder import compile_rule, operator_catalog
from fastapi import APIRouter, Depends, Body
from sqlalchemy.orm import Session

from app.db.session import SessionLocal
from app.core.security import require_role
from app.services.builder.models import RuleDraft
from app.services.builder.compile import compile_sigma_from_draft
from app.services.builder.catalog import operator_catalog

router = APIRouter(prefix="/builder", tags=["builder"])


@router.get("/operators", response_model=dict)
def get_operators():
return {"operators": operator_catalog()}
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()


@router.post("/rules/draft", response_model=dict)
def compile_draft(draft: RuleDraft):
try:
sigma = compile_rule(draft)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"sigma": sigma}
@router.get("/operators", response_model=list[dict], summary="Builder operator catalog")
def get_operators(_=Depends(require_role("admin", "analyst", "viewer"))):
return operator_catalog()


@router.post("/compile", response_model=dict, summary="Compile draft to Sigma YAML")
def compile_draft(payload: RuleDraft = Body(...), _=Depends(require_role("admin", "analyst"))):
yaml_text = compile_sigma_from_draft(payload)
return {"sigma_yaml": yaml_text}
9 changes: 2 additions & 7 deletions backend/app/services/builder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from typing import get_args

from .compile import compile_sigma_from_draft
from .models import RuleDraft, Operator
from .models import RuleDraft
from .catalog import operator_catalog


def compile_rule(draft: RuleDraft) -> str:
"""Compile a RuleDraft into Sigma YAML string."""
if not draft.predicates:
raise ValueError("predicates[] required")
return compile_sigma_from_draft(draft)


def operator_catalog() -> list[str]:
return list(get_args(Operator))
16 changes: 16 additions & 0 deletions backend/app/services/builder/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations


def operator_catalog() -> list[dict]:
return [
{"op": "equals", "types": ["string", "number", "boolean"]},
{"op": "contains", "types": ["string"]},
{"op": "startswith", "types": ["string"]},
{"op": "endswith", "types": ["string"]},
{"op": "in", "types": ["string", "number"], "value": "list"},
{"op": "regex", "types": ["string"]},
{"op": "gt", "types": ["number", "date"]},
{"op": "gte", "types": ["number", "date"]},
{"op": "lt", "types": ["number", "date"]},
{"op": "lte", "types": ["number", "date"]},
]
30 changes: 20 additions & 10 deletions tests/backend/test_builder_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@

from fastapi import FastAPI
from fastapi.testclient import TestClient

from backend.app.api.v1.builder import router as builder_router
from backend.app.core.security import create_access_token

app = FastAPI()
app.include_router(builder_router, prefix="/api/v1")


def auth_header(role: str = "analyst") -> dict:
tok = create_access_token(username=role, role=role)
return {"Authorization": f"Bearer {tok}"}


def test_operator_catalog():
with TestClient(app) as client:
r = client.get("/api/v1/builder/operators")
hdr = auth_header("viewer")
r = client.get("/api/v1/builder/operators", headers=hdr)
assert r.status_code == 200
ops = r.json().get("operators")
assert "equals" in ops
assert "contains" in ops
ops = r.json()
assert any(o["op"] == "equals" for o in ops)
assert any(o["op"] == "contains" for o in ops)


def test_compile_rule_draft_any():
Expand All @@ -27,14 +35,15 @@ def test_compile_rule_draft_any():
"logsource": {"product": "windows"},
"predicates": [
{"field": "proc.name", "op": "equals", "value": "cmd.exe"},
{"field": "proc.cmd", "op": "contains", "value": "/c"}
{"field": "proc.cmd", "op": "contains", "value": "/c"},
],
"combine": "any",
}
with TestClient(app) as client:
r = client.post("/api/v1/builder/rules/draft", json=payload)
hdr = auth_header("analyst")
r = client.post("/api/v1/builder/compile", headers=hdr, json=payload)
assert r.status_code == 200
sigma = r.json().get("sigma")
sigma = r.json()["sigma_yaml"]
data = yaml.safe_load(sigma)
assert data["title"] == "Test Rule"
assert data["detection"]["condition"] == "sel0 or sel1"
Expand All @@ -46,14 +55,15 @@ def test_compile_rule_draft_all():
"logsource": {"service": "proc"},
"predicates": [
{"field": "a", "op": "equals", "value": 1},
{"field": "b", "op": "gt", "value": 2}
{"field": "b", "op": "gt", "value": 2},
],
"combine": "all",
}
with TestClient(app) as client:
r = client.post("/api/v1/builder/rules/draft", json=payload)
hdr = auth_header("analyst")
r = client.post("/api/v1/builder/compile", headers=hdr, json=payload)
assert r.status_code == 200
sigma = r.json().get("sigma")
sigma = r.json()["sigma_yaml"]
data = yaml.safe_load(sigma)
sel = data["detection"]["sel"]
assert sel["a|equals"] == 1
Expand Down
Loading