diff --git a/docs/getting-started/concepts.md b/docs/getting-started/concepts.md index c6adbb1..cb0535e 100644 --- a/docs/getting-started/concepts.md +++ b/docs/getting-started/concepts.md @@ -141,21 +141,19 @@ ti = Cyvest.io_load_threat_intel_draft(report) obs.with_ti_draft(ti) ``` -An optional `preprocessor` callback lets you normalise source-specific data before validation: +An optional `preprocessor` callback lets you normalise source-specific data before validation. +For the common case of forcing certain reports to **SAFE**, use `safe_getter` and `safe_values` instead: ```python -def misp_warning_list_preprocessor(data: dict) -> dict: - extra = data.get("extra") - task_name = str(extra.get("task_name", "")) if isinstance(extra, dict) else "" - warning_list_tasks = {"MISP.analyzer.DBWarningList", "MISP.analyzer.SearchWarningList"} - if task_name in warning_list_tasks and data.get("level") not in ("INFO", "SAFE"): - data["level"] = "SAFE" - data["score"] = 0.0 - return data - -ti = Cyvest.io_load_threat_intel_draft(report, preprocessor=misp_warning_list_preprocessor) +ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList", "MISP.analyzer.SearchWarningList"], +) ``` +When `safe_getter(report)` matches any entry in `safe_values` and the level is not already INFO or SAFE, the score is set to `0.0` and the level to `SAFE`. + ### Tags - Group related checks together - Create logical investigation sections diff --git a/js/packages/cyvest-app/package.json b/js/packages/cyvest-app/package.json index a069567..548b018 100644 --- a/js/packages/cyvest-app/package.json +++ b/js/packages/cyvest-app/package.json @@ -1,6 +1,6 @@ { "name": "@cyvest/cyvest-app", - "version": "5.3.2", + "version": "5.3.3", "private": true, "scripts": { "dev": "vite", diff --git a/js/packages/cyvest-js/package.json b/js/packages/cyvest-js/package.json index 430edea..e84685e 100644 --- a/js/packages/cyvest-js/package.json +++ b/js/packages/cyvest-js/package.json @@ -1,6 +1,6 @@ { "name": "@cyvest/cyvest-js", - "version": "5.3.2", + "version": "5.3.3", "type": "module", "files": [ "dist" diff --git a/js/packages/cyvest-vis/package.json b/js/packages/cyvest-vis/package.json index 397cf1b..4be3b7c 100644 --- a/js/packages/cyvest-vis/package.json +++ b/js/packages/cyvest-vis/package.json @@ -1,6 +1,6 @@ { "name": "@cyvest/cyvest-vis", - "version": "5.3.2", + "version": "5.3.3", "type": "module", "files": [ "dist" diff --git a/pyproject.toml b/pyproject.toml index d537b41..901b8fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cyvest" -version = "5.3.2" +version = "5.3.3" description = "Cybersecurity investigation model" readme = {file = "README.md", content-type = "text/markdown"} requires-python = ">=3.10" diff --git a/src/cyvest/__init__.py b/src/cyvest/__init__.py index d19c0fa..9fabe4d 100644 --- a/src/cyvest/__init__.py +++ b/src/cyvest/__init__.py @@ -21,7 +21,7 @@ from cyvest.model_enums import ObservableType, RelationshipDirection, RelationshipType from cyvest.proxies import CheckProxy, EnrichmentProxy, ObservableProxy, TagProxy, ThreatIntelProxy -__version__ = "5.3.2" +__version__ = "5.3.3" logger.disable("cyvest") diff --git a/src/cyvest/cyvest.py b/src/cyvest/cyvest.py index bfdb3ff..f10f504 100644 --- a/src/cyvest/cyvest.py +++ b/src/cyvest/cyvest.py @@ -1068,6 +1068,8 @@ def io_load_threat_intel_draft( report: dict[str, Any], *, preprocessor: Callable[[dict[str, Any]], dict[str, Any]] | None = None, + safe_getter: Callable[[dict[str, Any]], Any] | None = None, + safe_values: Iterable[str] | None = None, ) -> ThreatIntel: """ Load a ThreatIntel draft from an external API report dict. @@ -1084,14 +1086,19 @@ def io_load_threat_intel_draft( external service (e.g. a SOAR/TIP API response). preprocessor: Optional callback that receives a **shallow copy** of *report* and returns a (possibly modified) dict before - validation. Useful for source-specific normalisation such - as overriding the level for warning-list entries. + validation. Runs before the safe-override check. + safe_getter: Optional callable that extracts a value from the + report dict to match against *safe_values*. + safe_values: Values that, when matched by *safe_getter*, force + score to ``0.0`` and level to ``SAFE``. Requires + *safe_getter* to be set. Returns: Unbound ThreatIntel instance (observable_key is empty). Raises: TypeError: If *report* is not a dict. + ValueError: If *safe_values* is set without *safe_getter*. pydantic.ValidationError: If the extracted payload fails ThreatIntel model validation. @@ -1099,29 +1106,35 @@ def io_load_threat_intel_draft( Basic usage:: report = {"source": "virustotal", "score": 4.256, "level": "SUSPICIOUS"} - ti = cv.io_load_threat_intel_draft(report) + ti = Cyvest.io_load_threat_intel_draft(report) obs.with_ti_draft(ti) - With a preprocessor that forces MISP warning-list reports to SAFE:: + Force MISP warning-list reports to SAFE with ``safe_getter``:: - def misp_warning_list_preprocessor(data: dict) -> dict: - extra = data.get("extra") - task_name = str(extra.get("task_name", "")) if isinstance(extra, dict) else "" - warning_list_tasks = {"MISP.analyzer.DBWarningList", "MISP.analyzer.SearchWarningList"} - if task_name in warning_list_tasks and data.get("level") not in ("INFO", "SAFE"): - data["level"] = "SAFE" - data["score"] = 0.0 - return data - - ti = cv.io_load_threat_intel_draft(report, preprocessor=misp_warning_list_preprocessor) + ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList", "MISP.analyzer.SearchWarningList"], + ) """ if not isinstance(report, dict): raise TypeError(f"report must be a dict, got {type(report).__name__}") + if safe_values is not None and safe_getter is None: + raise ValueError("safe_values requires safe_getter to be set.") data: dict[str, Any] = dict(report) # shallow copy if preprocessor is not None: data = preprocessor(data) + # Built-in safe override: if safe_getter(data) matches any safe_values + # and level is not already INFO or SAFE, force SAFE + score 0. + if safe_getter is not None and safe_values is not None: + matched = safe_getter(data) + safe_set = set(safe_values) + if matched in safe_set and data.get("level") not in ("INFO", "SAFE"): + data["level"] = "SAFE" + data["score"] = 0.0 + raw_score = data.get("score") if raw_score is not None: rounded = round_score_decimal(Decimal(str(raw_score))) diff --git a/tests/test_cyvest.py b/tests/test_cyvest.py index d81170a..f302597 100644 --- a/tests/test_cyvest.py +++ b/tests/test_cyvest.py @@ -872,6 +872,66 @@ def force_safe(data: dict) -> dict: assert ti.score == Decimal("0.00") +def test_threat_intel_from_report_safe_getter_triggers() -> None: + """safe_getter + safe_values overrides level/score to SAFE/0 when matched.""" + report = { + "source": "misp", + "score": 7.5, + "level": "MALICIOUS", + "extra": {"task_name": "MISP.analyzer.DBWarningList"}, + } + ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList", "MISP.analyzer.SearchWarningList"], + ) + assert ti.level.value == "SAFE" + assert ti.score == Decimal("0.00") + + +def test_threat_intel_from_report_safe_getter_no_match() -> None: + """safe_getter that doesn't match keeps original level/score.""" + report = { + "source": "misp", + "score": 7.5, + "level": "MALICIOUS", + "extra": {"task_name": "MISP.analyzer.SomeOtherTask"}, + } + ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList"], + ) + assert ti.level.value == "MALICIOUS" + assert ti.score == Decimal("7.50") + + +def test_threat_intel_from_report_safe_getter_already_safe() -> None: + """safe_getter match with level already SAFE keeps SAFE (no double override).""" + report = { + "source": "misp", + "score": 0.0, + "level": "SAFE", + "extra": {"task_name": "MISP.analyzer.DBWarningList"}, + } + ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList"], + ) + assert ti.level.value == "SAFE" + assert ti.score == Decimal("0.00") + + +def test_threat_intel_from_report_safe_values_without_getter_raises() -> None: + """safe_values without safe_getter raises ValueError.""" + with pytest.raises(ValueError, match="safe_values requires safe_getter"): + Cyvest.io_load_threat_intel_draft( + {"source": "test", "score": 1.0}, + safe_values=["something"], + ) + + def test_threat_intel_from_report_does_not_mutate_input() -> None: """Original report dict is not modified.""" cv = Cyvest() @@ -899,3 +959,45 @@ def test_threat_intel_from_report_attach_to_observable() -> None: refreshed = cv.observable_get(obs.key) assert refreshed is not None assert refreshed.score >= Decimal("8.00") + + +def test_threat_intel_from_report_preprocessor_then_safe_getter() -> None: + """Preprocessor runs first, then safe_getter evaluates the modified data.""" + report = { + "source": "misp", + "score": 6.0, + "level": "MALICIOUS", + "extra": {"raw_task": "DBWarningList"}, + } + + # Preprocessor normalises task_name so safe_getter can match it + def normalise(data: dict) -> dict: + raw = data.get("extra", {}).get("raw_task", "") + data.setdefault("extra", {})["task_name"] = f"MISP.analyzer.{raw}" + return data + + ti = Cyvest.io_load_threat_intel_draft( + report, + preprocessor=normalise, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.DBWarningList"], + ) + assert ti.level.value == "SAFE" + assert ti.score == Decimal("0.00") + + +def test_threat_intel_from_report_safe_getter_with_info_level_unchanged() -> None: + """safe_getter match with level=INFO keeps INFO (not overridden to SAFE).""" + report = { + "source": "misp", + "score": 0.0, + "level": "INFO", + "extra": {"task_name": "MISP.analyzer.SearchWarningList"}, + } + ti = Cyvest.io_load_threat_intel_draft( + report, + safe_getter=lambda d: d.get("extra", {}).get("task_name", ""), + safe_values=["MISP.analyzer.SearchWarningList"], + ) + assert ti.level.value == "INFO" + assert ti.score == Decimal("0.00") diff --git a/uv.lock b/uv.lock index 273412a..ac6be13 100644 --- a/uv.lock +++ b/uv.lock @@ -305,7 +305,7 @@ toml = [ [[package]] name = "cyvest" -version = "5.3.2" +version = "5.3.3" source = { editable = "." } dependencies = [ { name = "click" },