From 1d13c3d6cf74597106a380f7508df06f4ff72a66 Mon Sep 17 00:00:00 2001 From: Jose Hernandez Date: Thu, 5 Jun 2025 16:21:24 -0700 Subject: [PATCH 1/4] fixing enrichment map --- contentctl/enrichments/attack_enrichment.py | 104 ++++++---- contentctl/output/attack_nav_output.py | 199 +++++++++++++++++--- pyproject.toml | 1 + 3 files changed, 239 insertions(+), 65 deletions(-) diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index b437860d..6a111760 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -1,22 +1,44 @@ from __future__ import annotations -from attackcti import attack_client + import logging -from pydantic import BaseModel from dataclasses import field -from typing import Any from pathlib import Path +from typing import Any, Dict, List, TypedDict, cast + +from attackcti import attack_client # type: ignore[reportMissingTypeStubs] +from pydantic import BaseModel + +from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE +from contentctl.objects.config import validate from contentctl.objects.mitre_attack_enrichment import ( MitreAttackEnrichment, MitreTactics, ) -from contentctl.objects.config import validate -from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE +# Suppress attackcti logging logging.getLogger("taxii2client").setLevel(logging.CRITICAL) +logging.getLogger("stix2").setLevel(logging.CRITICAL) + + +class AttackPattern(TypedDict): + id: str + technique_id: str + technique: str + tactic: List[str] + + +class IntrusionSet(TypedDict): + id: str + group: str + + +class Relationship(TypedDict): + target_object: str + source_object: str class AttackEnrichment(BaseModel): - data: dict[str, MitreAttackEnrichment] = field(default_factory=dict) + data: Dict[str, MitreAttackEnrichment] = field(default_factory=dict) use_enrichment: bool = True @staticmethod @@ -42,7 +64,7 @@ def getEnrichmentByMitreID( ) def addMitreIDViaGroupNames( - self, technique: dict[str, Any], tactics: list[str], groupNames: list[str] + self, technique: Dict[str, Any], tactics: List[str], groupNames: List[str] ) -> None: technique_id = technique["technique_id"] technique_obj = technique["technique"] @@ -62,15 +84,15 @@ def addMitreIDViaGroupNames( def addMitreIDViaGroupObjects( self, - technique: dict[str, Any], - tactics: list[MitreTactics], - groupDicts: list[dict[str, Any]], + technique: Dict[str, Any], + tactics: List[MitreTactics], + groupDicts: List[Dict[str, Any]], ) -> None: technique_id = technique["technique_id"] technique_obj = technique["technique"] tactics.sort() - groupNames: list[str] = sorted([group["group"] for group in groupDicts]) + groupNames: List[str] = sorted([group["group"] for group in groupDicts]) if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") @@ -87,8 +109,8 @@ def addMitreIDViaGroupObjects( def get_attack_lookup( self, input_path: Path, enrichments: bool = False - ) -> dict[str, MitreAttackEnrichment]: - attack_lookup: dict[str, MitreAttackEnrichment] = {} + ) -> Dict[str, MitreAttackEnrichment]: + attack_lookup: Dict[str, MitreAttackEnrichment] = {} if not enrichments: return attack_lookup @@ -98,11 +120,6 @@ def get_attack_lookup( end="", flush=True, ) - # The existence of the input_path is validated during cli argument validation, but it is - # possible that the repo is in the wrong format. If the following directories do not - # exist, then attack_client will fall back to resolving via REST API. We do not - # want this as it is slow and error prone, so we will force an exception to - # be generated. enterprise_path = input_path / "enterprise-attack" mobile_path = input_path / "ics-attack" ics_path = input_path / "mobile-attack" @@ -123,36 +140,47 @@ def get_attack_lookup( } ) - all_enterprise_techniques = lift.get_enterprise_techniques( - stix_format=False + all_enterprise_techniques = cast( + List[AttackPattern], lift.get_enterprise_techniques(stix_format=False) ) - enterprise_relationships = lift.get_enterprise_relationships( - stix_format=False + enterprise_relationships = cast( + List[Relationship], lift.get_enterprise_relationships(stix_format=False) + ) + enterprise_groups = cast( + List[IntrusionSet], lift.get_enterprise_groups(stix_format=False) ) - enterprise_groups = lift.get_enterprise_groups(stix_format=False) for technique in all_enterprise_techniques: - apt_groups: list[dict[str, Any]] = [] + apt_groups: List[Dict[str, Any]] = [] for relationship in enterprise_relationships: - if ( - relationship["target_object"] == technique["id"] - ) and relationship["source_object"].startswith("intrusion-set"): + if relationship["target_object"] == technique[ + "id" + ] and relationship["source_object"].startswith("intrusion-set"): for group in enterprise_groups: if relationship["source_object"] == group["id"]: - apt_groups.append(group) - # apt_groups.append(group['group']) + apt_groups.append(dict(group)) - tactics = [] + tactics: List[MitreTactics] = [] if "tactic" in technique: for tactic in technique["tactic"]: - tactics.append(tactic.replace("-", " ").title()) - - self.addMitreIDViaGroupObjects(technique, tactics, apt_groups) - attack_lookup[technique["technique_id"]] = { - "technique": technique["technique"], - "tactics": tactics, - "groups": apt_groups, - } + tactics.append( + cast(MitreTactics, tactic.replace("-", " ").title()) + ) + + self.addMitreIDViaGroupObjects(dict(technique), tactics, apt_groups) + attack_lookup[technique["technique_id"]] = ( + MitreAttackEnrichment.model_validate( + { + "mitre_attack_id": technique["technique_id"], + "mitre_attack_technique": technique["technique"], + "mitre_attack_tactics": tactics, + "mitre_attack_groups": [ + group["group"] for group in apt_groups + ], + "mitre_attack_group_objects": apt_groups, + } + ) + ) except Exception as err: raise Exception(f"Error getting MITRE Enrichment: {str(err)}") diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index 61436fe7..0997fdb3 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -1,47 +1,193 @@ +import json import pathlib -from typing import List, Union +from datetime import datetime +from typing import Any, Dict, List, Set, TypedDict, Union from contentctl.objects.detection import Detection -from contentctl.output.attack_nav_writer import AttackNavWriter + + +class TechniqueData(TypedDict): + score: int + file_paths: List[str] + links: List[Dict[str, str]] + + +class LayerData(TypedDict): + name: str + versions: Dict[str, str] + domain: str + description: str + filters: Dict[str, List[str]] + sorting: int + layout: Dict[str, Union[str, bool]] + hideDisabled: bool + techniques: List[Dict[str, Any]] + gradient: Dict[str, Union[List[str], int]] + legendItems: List[Dict[str, str]] + showTacticRowBackground: bool + tacticRowBackground: str + selectTechniquesAcrossTactics: bool + selectSubtechniquesWithParent: bool + selectVisibleTechniques: bool + metadata: List[Dict[str, str]] class AttackNavOutput: + def __init__( + self, + layer_name: str = "Splunk Detection Coverage", + layer_description: str = "MITRE ATT&CK coverage for Splunk detections", + layer_domain: str = "enterprise-attack", + ): + self.layer_name = layer_name + self.layer_description = layer_description + self.layer_domain = layer_domain + def writeObjects( self, detections: List[Detection], output_path: pathlib.Path ) -> None: - techniques: dict[str, dict[str, Union[List[str], int]]] = {} + """ + Generate MITRE ATT&CK Navigator layer file from detections + Args: + detections: List of Detection objects + output_path: Path to write the layer file + """ + techniques: Dict[str, TechniqueData] = {} + tactic_coverage: Dict[str, Set[str]] = {} + # Process each detection for detection in detections: + if not hasattr(detection.tags, "mitre_attack_id"): + continue + for tactic in detection.tags.mitre_attack_id: if tactic not in techniques: - techniques[tactic] = {"score": 0, "file_paths": []} + techniques[tactic] = {"score": 0, "file_paths": [], "links": []} + tactic_coverage[tactic] = set() detection_type = detection.source - detection_id = detection.id + detection_id = str(detection.id) # Convert UUID to string + detection_url = ( + f"https://research.splunk.com/{detection_type}/{detection_id}/" + ) + detection_name = detection.name.replace( + "_", " " + ).title() # Convert to Title Case + detection_info = f"{detection_name}" - # Store all three pieces of information separately - detection_info = f"{detection_type}|{detection_id}|{detection.name}" + techniques[tactic]["score"] += 1 + techniques[tactic]["file_paths"].append(detection_info) + techniques[tactic]["links"].append( + {"label": detection_name, "url": detection_url} + ) + tactic_coverage[tactic].add(detection_id) - techniques[tactic]["score"] = techniques[tactic].get("score", 0) + 1 - if isinstance(techniques[tactic]["file_paths"], list): - techniques[tactic]["file_paths"].append(detection_info) + # Create the layer file + layer: LayerData = { + "name": self.layer_name, + "versions": { + "attack": "14", # Update as needed + "navigator": "5.1.0", + "layer": "4.5", + }, + "domain": self.layer_domain, + "description": self.layer_description, + "filters": { + "platforms": [ + "Windows", + "Linux", + "macOS", + "AWS", + "GCP", + "Azure", + "Office 365", + "SaaS", + ] + }, + "sorting": 0, + "layout": { + "layout": "flat", + "showName": True, + "showID": False, + "showAggregateScores": True, + "countUnscored": True, + "aggregateFunction": "average", + "expandedSubtechniques": "none", + }, + "hideDisabled": False, + "techniques": [ + { + "techniqueID": tid, + "score": data["score"], + "metadata": [ + { + "name": "Detections", + "value": "\n".join( + [f"• {name}" for name in data["file_paths"]] + ), + }, + {"divider": True}, + { + "name": "Links", + "value": "\n".join( + [ + f"• [{link['label']}]({link['url']})" + for link in data["links"] + ] + ), + }, + ], + "links": data["links"], + } + for tid, data in techniques.items() + ], + "gradient": { + "colors": [ + "#1a365d", # Dark blue + "#2c5282", # Medium blue + "#4299e1", # Light blue + "#48bb78", # Light green + "#38a169", # Medium green + "#276749", # Dark green + ], + "minValue": 0, + "maxValue": 5, # Adjust based on your max detections per technique + }, + "legendItems": [ + {"label": "1 Detection", "color": "#1a365d"}, + {"label": "2 Detections", "color": "#4299e1"}, + {"label": "3 Detections", "color": "#48bb78"}, + {"label": "4+ Detections", "color": "#276749"}, + ], + "showTacticRowBackground": True, + "tacticRowBackground": "#dddddd", + "selectTechniquesAcrossTactics": True, + "selectSubtechniquesWithParent": True, + "selectVisibleTechniques": False, + "metadata": [ + {"name": "Generated", "value": datetime.now().isoformat()}, + {"name": "Total Detections", "value": str(len(detections))}, + {"name": "Covered Techniques", "value": str(len(techniques))}, + ], + } - """ - for detection in objects: - if detection.tags.mitre_attack_enrichments: - for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments: - if not mitre_attack_enrichment.mitre_attack_id in techniques: - techniques[mitre_attack_enrichment.mitre_attack_id] = { - 'score': 1, - 'file_paths': ['https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)] - } - else: - techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1 - techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)) - """ - AttackNavWriter.writeAttackNavFile(techniques, output_path / "coverage.json") + # Write the layer file + output_file = output_path / "coverage.json" + with open(output_file, "w") as f: + json.dump(layer, f, indent=2) + + print(f"\n✅ MITRE ATT&CK Navigator layer file written to: {output_file}") + print("📊 Coverage Summary:") + print(f" Total Detections: {len(detections)}") + print(f" Covered Techniques: {len(techniques)}") + print(f" Tactics with Coverage: {len(tactic_coverage)}") + print("\n🗺️ To view the layer:") + print(" 1. Go to https://mitre-attack.github.io/attack-navigator/") + print(" 2. Click 'Open Existing Layer'") + print(f" 3. Select the file: {output_file}") - def convertNameToFileName(self, name: str): + def convertNameToFileName(self, name: str) -> str: + """Convert a detection name to a valid filename""" file_name = ( name.replace(" ", "_") .replace("-", "_") @@ -49,5 +195,4 @@ def convertNameToFileName(self, name: str): .replace("/", "_") .lower() ) - file_name = file_name + ".yml" - return file_name + return f"{file_name}.yml" diff --git a/pyproject.toml b/pyproject.toml index 73d869e7..3d256029 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ pygit2 = "^1.15.1" tyro = "^0.9.2" gitpython = "^3.1.43" setuptools = ">=69.5.1,<76.0.0" +pandas = "^2.3.0" [tool.poetry.group.dev.dependencies] ruff = "^0.9.10" From f82bed7fa263e4b59fb73f1ba82522d68347f418 Mon Sep 17 00:00:00 2001 From: Jose Hernandez Date: Thu, 5 Jun 2025 16:27:45 -0700 Subject: [PATCH 2/4] multiple metadata --- contentctl/output/attack_nav_output.py | 29 ++++++++++++-------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index 0997fdb3..a433b03b 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -120,24 +120,21 @@ def writeObjects( "techniqueID": tid, "score": data["score"], "metadata": [ + {"name": "Detection", "value": name, "divider": False} + for name in data["file_paths"] + ] + + [ { - "name": "Detections", - "value": "\n".join( - [f"• {name}" for name in data["file_paths"]] - ), - }, - {"divider": True}, - { - "name": "Links", - "value": "\n".join( - [ - f"• [{link['label']}]({link['url']})" - for link in data["links"] - ] - ), - }, + "name": "Link", + "value": f"[View Detection]({link['url']})", + "divider": False, + } + for link in data["links"] + ], + "links": [ + {"label": link["label"], "url": link["url"]} + for link in data["links"] ], - "links": data["links"], } for tid, data in techniques.items() ], From 0ec47772a1650e4e9d764d2db95def7fdff3c613 Mon Sep 17 00:00:00 2001 From: Jose Hernandez Date: Thu, 5 Jun 2025 16:59:55 -0700 Subject: [PATCH 3/4] fixing base on feedback --- contentctl/output/attack_nav_output.py | 28 ++++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index a433b03b..f83d0087 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -1,35 +1,37 @@ +# Standard library imports import json import pathlib from datetime import datetime -from typing import Any, Dict, List, Set, TypedDict, Union +from typing import Any, TypedDict, Union +# Third-party imports from contentctl.objects.detection import Detection class TechniqueData(TypedDict): score: int - file_paths: List[str] - links: List[Dict[str, str]] + file_paths: list[str] + links: list[dict[str, str]] class LayerData(TypedDict): name: str - versions: Dict[str, str] + versions: dict[str, str] domain: str description: str - filters: Dict[str, List[str]] + filters: dict[str, list[str]] sorting: int - layout: Dict[str, Union[str, bool]] + layout: dict[str, Union[str, bool]] hideDisabled: bool - techniques: List[Dict[str, Any]] - gradient: Dict[str, Union[List[str], int]] - legendItems: List[Dict[str, str]] + techniques: list[dict[str, Any]] + gradient: dict[str, Union[list[str], int]] + legendItems: list[dict[str, str]] showTacticRowBackground: bool tacticRowBackground: str selectTechniquesAcrossTactics: bool selectSubtechniquesWithParent: bool selectVisibleTechniques: bool - metadata: List[Dict[str, str]] + metadata: list[dict[str, str]] class AttackNavOutput: @@ -44,7 +46,7 @@ def __init__( self.layer_domain = layer_domain def writeObjects( - self, detections: List[Detection], output_path: pathlib.Path + self, detections: list[Detection], output_path: pathlib.Path ) -> None: """ Generate MITRE ATT&CK Navigator layer file from detections @@ -52,8 +54,8 @@ def writeObjects( detections: List of Detection objects output_path: Path to write the layer file """ - techniques: Dict[str, TechniqueData] = {} - tactic_coverage: Dict[str, Set[str]] = {} + techniques: dict[str, TechniqueData] = {} + tactic_coverage: dict[str, set[str]] = {} # Process each detection for detection in detections: From dba378af695026d5c1079806f610279bae2c1330 Mon Sep 17 00:00:00 2001 From: Eric McGinnis Date: Tue, 10 Jun 2025 09:20:00 -0700 Subject: [PATCH 4/4] Remove outdated annotation syntax. Bump bugfix release in pyproject. Remove pandas requirement, which is not used, from pyproject.toml --- contentctl/enrichments/attack_enrichment.py | 30 ++++++++++----------- contentctl/output/attack_nav_output.py | 6 ++--- pyproject.toml | 3 +-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index 6a111760..33729f72 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -3,7 +3,7 @@ import logging from dataclasses import field from pathlib import Path -from typing import Any, Dict, List, TypedDict, cast +from typing import Any, TypedDict, cast from attackcti import attack_client # type: ignore[reportMissingTypeStubs] from pydantic import BaseModel @@ -24,7 +24,7 @@ class AttackPattern(TypedDict): id: str technique_id: str technique: str - tactic: List[str] + tactic: list[str] class IntrusionSet(TypedDict): @@ -38,7 +38,7 @@ class Relationship(TypedDict): class AttackEnrichment(BaseModel): - data: Dict[str, MitreAttackEnrichment] = field(default_factory=dict) + data: dict[str, MitreAttackEnrichment] = field(default_factory=dict) use_enrichment: bool = True @staticmethod @@ -64,7 +64,7 @@ def getEnrichmentByMitreID( ) def addMitreIDViaGroupNames( - self, technique: Dict[str, Any], tactics: List[str], groupNames: List[str] + self, technique: dict[str, Any], tactics: list[str], groupNames: list[str] ) -> None: technique_id = technique["technique_id"] technique_obj = technique["technique"] @@ -84,15 +84,15 @@ def addMitreIDViaGroupNames( def addMitreIDViaGroupObjects( self, - technique: Dict[str, Any], - tactics: List[MitreTactics], - groupDicts: List[Dict[str, Any]], + technique: dict[str, Any], + tactics: list[MitreTactics], + groupDicts: list[dict[str, Any]], ) -> None: technique_id = technique["technique_id"] technique_obj = technique["technique"] tactics.sort() - groupNames: List[str] = sorted([group["group"] for group in groupDicts]) + groupNames: list[str] = sorted([group["group"] for group in groupDicts]) if technique_id in self.data: raise Exception(f"Error, trying to redefine MITRE ID '{technique_id}'") @@ -109,8 +109,8 @@ def addMitreIDViaGroupObjects( def get_attack_lookup( self, input_path: Path, enrichments: bool = False - ) -> Dict[str, MitreAttackEnrichment]: - attack_lookup: Dict[str, MitreAttackEnrichment] = {} + ) -> dict[str, MitreAttackEnrichment]: + attack_lookup: dict[str, MitreAttackEnrichment] = {} if not enrichments: return attack_lookup @@ -141,17 +141,17 @@ def get_attack_lookup( ) all_enterprise_techniques = cast( - List[AttackPattern], lift.get_enterprise_techniques(stix_format=False) + list[AttackPattern], lift.get_enterprise_techniques(stix_format=False) ) enterprise_relationships = cast( - List[Relationship], lift.get_enterprise_relationships(stix_format=False) + list[Relationship], lift.get_enterprise_relationships(stix_format=False) ) enterprise_groups = cast( - List[IntrusionSet], lift.get_enterprise_groups(stix_format=False) + list[IntrusionSet], lift.get_enterprise_groups(stix_format=False) ) for technique in all_enterprise_techniques: - apt_groups: List[Dict[str, Any]] = [] + apt_groups: list[dict[str, Any]] = [] for relationship in enterprise_relationships: if relationship["target_object"] == technique[ "id" @@ -160,7 +160,7 @@ def get_attack_lookup( if relationship["source_object"] == group["id"]: apt_groups.append(dict(group)) - tactics: List[MitreTactics] = [] + tactics: list[MitreTactics] = [] if "tactic" in technique: for tactic in technique["tactic"]: tactics.append( diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index f83d0087..abd1c44f 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -2,7 +2,7 @@ import json import pathlib from datetime import datetime -from typing import Any, TypedDict, Union +from typing import Any, TypedDict # Third-party imports from contentctl.objects.detection import Detection @@ -21,10 +21,10 @@ class LayerData(TypedDict): description: str filters: dict[str, list[str]] sorting: int - layout: dict[str, Union[str, bool]] + layout: dict[str, str | bool] hideDisabled: bool techniques: list[dict[str, Any]] - gradient: dict[str, Union[list[str], int]] + gradient: dict[str, list[str] | int] legendItems: list[dict[str, str]] showTacticRowBackground: bool tacticRowBackground: str diff --git a/pyproject.toml b/pyproject.toml index c6eb0984..1337b83d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "contentctl" -version = "5.5.3" +version = "5.5.4" description = "Splunk Content Control Tool" authors = ["STRT "] @@ -30,7 +30,6 @@ tqdm = "^4.66.5" pygit2 = "^1.15.1" tyro = "^0.9.2" gitpython = "^3.1.43" -pandas = "^2.3.0" setuptools = ">=69.5.1,<81.0.0" rich = "^14.0.0"