diff --git a/contentctl/enrichments/attack_enrichment.py b/contentctl/enrichments/attack_enrichment.py index b437860d..33729f72 100644 --- a/contentctl/enrichments/attack_enrichment.py +++ b/contentctl/enrichments/attack_enrichment.py @@ -1,18 +1,40 @@ from __future__ import annotations -from attackcti import attack_client + import logging -from pydantic import BaseModel from dataclasses import field -from typing import Any from pathlib import Path +from typing import Any, TypedDict, cast + +from attackcti import attack_client # type: ignore[reportMissingTypeStubs] +from pydantic import BaseModel + +from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE +from contentctl.objects.config import validate from contentctl.objects.mitre_attack_enrichment import ( MitreAttackEnrichment, MitreTactics, ) -from contentctl.objects.config import validate -from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE +# Suppress attackcti logging logging.getLogger("taxii2client").setLevel(logging.CRITICAL) +logging.getLogger("stix2").setLevel(logging.CRITICAL) + + +class AttackPattern(TypedDict): + id: str + technique_id: str + technique: str + tactic: list[str] + + +class IntrusionSet(TypedDict): + id: str + group: str + + +class Relationship(TypedDict): + target_object: str + source_object: str class AttackEnrichment(BaseModel): @@ -98,11 +120,6 @@ def get_attack_lookup( end="", flush=True, ) - # The existence of the input_path is validated during cli argument validation, but it is - # possible that the repo is in the wrong format. If the following directories do not - # exist, then attack_client will fall back to resolving via REST API. We do not - # want this as it is slow and error prone, so we will force an exception to - # be generated. enterprise_path = input_path / "enterprise-attack" mobile_path = input_path / "ics-attack" ics_path = input_path / "mobile-attack" @@ -123,36 +140,47 @@ def get_attack_lookup( } ) - all_enterprise_techniques = lift.get_enterprise_techniques( - stix_format=False + all_enterprise_techniques = cast( + list[AttackPattern], lift.get_enterprise_techniques(stix_format=False) ) - enterprise_relationships = lift.get_enterprise_relationships( - stix_format=False + enterprise_relationships = cast( + list[Relationship], lift.get_enterprise_relationships(stix_format=False) + ) + enterprise_groups = cast( + list[IntrusionSet], lift.get_enterprise_groups(stix_format=False) ) - enterprise_groups = lift.get_enterprise_groups(stix_format=False) for technique in all_enterprise_techniques: apt_groups: list[dict[str, Any]] = [] for relationship in enterprise_relationships: - if ( - relationship["target_object"] == technique["id"] - ) and relationship["source_object"].startswith("intrusion-set"): + if relationship["target_object"] == technique[ + "id" + ] and relationship["source_object"].startswith("intrusion-set"): for group in enterprise_groups: if relationship["source_object"] == group["id"]: - apt_groups.append(group) - # apt_groups.append(group['group']) + apt_groups.append(dict(group)) - tactics = [] + tactics: list[MitreTactics] = [] if "tactic" in technique: for tactic in technique["tactic"]: - tactics.append(tactic.replace("-", " ").title()) - - self.addMitreIDViaGroupObjects(technique, tactics, apt_groups) - attack_lookup[technique["technique_id"]] = { - "technique": technique["technique"], - "tactics": tactics, - "groups": apt_groups, - } + tactics.append( + cast(MitreTactics, tactic.replace("-", " ").title()) + ) + + self.addMitreIDViaGroupObjects(dict(technique), tactics, apt_groups) + attack_lookup[technique["technique_id"]] = ( + MitreAttackEnrichment.model_validate( + { + "mitre_attack_id": technique["technique_id"], + "mitre_attack_technique": technique["technique"], + "mitre_attack_tactics": tactics, + "mitre_attack_groups": [ + group["group"] for group in apt_groups + ], + "mitre_attack_group_objects": apt_groups, + } + ) + ) except Exception as err: raise Exception(f"Error getting MITRE Enrichment: {str(err)}") diff --git a/contentctl/output/attack_nav_output.py b/contentctl/output/attack_nav_output.py index 61436fe7..abd1c44f 100644 --- a/contentctl/output/attack_nav_output.py +++ b/contentctl/output/attack_nav_output.py @@ -1,47 +1,192 @@ +# Standard library imports +import json import pathlib -from typing import List, Union +from datetime import datetime +from typing import Any, TypedDict +# Third-party imports from contentctl.objects.detection import Detection -from contentctl.output.attack_nav_writer import AttackNavWriter + + +class TechniqueData(TypedDict): + score: int + file_paths: list[str] + links: list[dict[str, str]] + + +class LayerData(TypedDict): + name: str + versions: dict[str, str] + domain: str + description: str + filters: dict[str, list[str]] + sorting: int + layout: dict[str, str | bool] + hideDisabled: bool + techniques: list[dict[str, Any]] + gradient: dict[str, list[str] | int] + legendItems: list[dict[str, str]] + showTacticRowBackground: bool + tacticRowBackground: str + selectTechniquesAcrossTactics: bool + selectSubtechniquesWithParent: bool + selectVisibleTechniques: bool + metadata: list[dict[str, str]] class AttackNavOutput: + def __init__( + self, + layer_name: str = "Splunk Detection Coverage", + layer_description: str = "MITRE ATT&CK coverage for Splunk detections", + layer_domain: str = "enterprise-attack", + ): + self.layer_name = layer_name + self.layer_description = layer_description + self.layer_domain = layer_domain + def writeObjects( - self, detections: List[Detection], output_path: pathlib.Path + self, detections: list[Detection], output_path: pathlib.Path ) -> None: - techniques: dict[str, dict[str, Union[List[str], int]]] = {} + """ + Generate MITRE ATT&CK Navigator layer file from detections + Args: + detections: List of Detection objects + output_path: Path to write the layer file + """ + techniques: dict[str, TechniqueData] = {} + tactic_coverage: dict[str, set[str]] = {} + # Process each detection for detection in detections: + if not hasattr(detection.tags, "mitre_attack_id"): + continue + for tactic in detection.tags.mitre_attack_id: if tactic not in techniques: - techniques[tactic] = {"score": 0, "file_paths": []} + techniques[tactic] = {"score": 0, "file_paths": [], "links": []} + tactic_coverage[tactic] = set() detection_type = detection.source - detection_id = detection.id + detection_id = str(detection.id) # Convert UUID to string + detection_url = ( + f"https://research.splunk.com/{detection_type}/{detection_id}/" + ) + detection_name = detection.name.replace( + "_", " " + ).title() # Convert to Title Case + detection_info = f"{detection_name}" - # Store all three pieces of information separately - detection_info = f"{detection_type}|{detection_id}|{detection.name}" + techniques[tactic]["score"] += 1 + techniques[tactic]["file_paths"].append(detection_info) + techniques[tactic]["links"].append( + {"label": detection_name, "url": detection_url} + ) + tactic_coverage[tactic].add(detection_id) - techniques[tactic]["score"] = techniques[tactic].get("score", 0) + 1 - if isinstance(techniques[tactic]["file_paths"], list): - techniques[tactic]["file_paths"].append(detection_info) + # Create the layer file + layer: LayerData = { + "name": self.layer_name, + "versions": { + "attack": "14", # Update as needed + "navigator": "5.1.0", + "layer": "4.5", + }, + "domain": self.layer_domain, + "description": self.layer_description, + "filters": { + "platforms": [ + "Windows", + "Linux", + "macOS", + "AWS", + "GCP", + "Azure", + "Office 365", + "SaaS", + ] + }, + "sorting": 0, + "layout": { + "layout": "flat", + "showName": True, + "showID": False, + "showAggregateScores": True, + "countUnscored": True, + "aggregateFunction": "average", + "expandedSubtechniques": "none", + }, + "hideDisabled": False, + "techniques": [ + { + "techniqueID": tid, + "score": data["score"], + "metadata": [ + {"name": "Detection", "value": name, "divider": False} + for name in data["file_paths"] + ] + + [ + { + "name": "Link", + "value": f"[View Detection]({link['url']})", + "divider": False, + } + for link in data["links"] + ], + "links": [ + {"label": link["label"], "url": link["url"]} + for link in data["links"] + ], + } + for tid, data in techniques.items() + ], + "gradient": { + "colors": [ + "#1a365d", # Dark blue + "#2c5282", # Medium blue + "#4299e1", # Light blue + "#48bb78", # Light green + "#38a169", # Medium green + "#276749", # Dark green + ], + "minValue": 0, + "maxValue": 5, # Adjust based on your max detections per technique + }, + "legendItems": [ + {"label": "1 Detection", "color": "#1a365d"}, + {"label": "2 Detections", "color": "#4299e1"}, + {"label": "3 Detections", "color": "#48bb78"}, + {"label": "4+ Detections", "color": "#276749"}, + ], + "showTacticRowBackground": True, + "tacticRowBackground": "#dddddd", + "selectTechniquesAcrossTactics": True, + "selectSubtechniquesWithParent": True, + "selectVisibleTechniques": False, + "metadata": [ + {"name": "Generated", "value": datetime.now().isoformat()}, + {"name": "Total Detections", "value": str(len(detections))}, + {"name": "Covered Techniques", "value": str(len(techniques))}, + ], + } - """ - for detection in objects: - if detection.tags.mitre_attack_enrichments: - for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments: - if not mitre_attack_enrichment.mitre_attack_id in techniques: - techniques[mitre_attack_enrichment.mitre_attack_id] = { - 'score': 1, - 'file_paths': ['https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)] - } - else: - techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1 - techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)) - """ - AttackNavWriter.writeAttackNavFile(techniques, output_path / "coverage.json") + # Write the layer file + output_file = output_path / "coverage.json" + with open(output_file, "w") as f: + json.dump(layer, f, indent=2) + + print(f"\n✅ MITRE ATT&CK Navigator layer file written to: {output_file}") + print("📊 Coverage Summary:") + print(f" Total Detections: {len(detections)}") + print(f" Covered Techniques: {len(techniques)}") + print(f" Tactics with Coverage: {len(tactic_coverage)}") + print("\n🗺️ To view the layer:") + print(" 1. Go to https://mitre-attack.github.io/attack-navigator/") + print(" 2. Click 'Open Existing Layer'") + print(f" 3. Select the file: {output_file}") - def convertNameToFileName(self, name: str): + def convertNameToFileName(self, name: str) -> str: + """Convert a detection name to a valid filename""" file_name = ( name.replace(" ", "_") .replace("-", "_") @@ -49,5 +194,4 @@ def convertNameToFileName(self, name: str): .replace("/", "_") .lower() ) - file_name = file_name + ".yml" - return file_name + return f"{file_name}.yml" diff --git a/pyproject.toml b/pyproject.toml index 1f6940fd..1337b83d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "contentctl" -version = "5.5.3" +version = "5.5.4" description = "Splunk Content Control Tool" authors = ["STRT "]