Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 57 additions & 29 deletions contentctl/enrichments/attack_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
from __future__ import annotations
from attackcti import attack_client

import logging
from pydantic import BaseModel
from dataclasses import field
from typing import Any
from pathlib import Path
from typing import Any, TypedDict, cast

from attackcti import attack_client # type: ignore[reportMissingTypeStubs]
from pydantic import BaseModel

from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE
from contentctl.objects.config import validate
from contentctl.objects.mitre_attack_enrichment import (
MitreAttackEnrichment,
MitreTactics,
)
from contentctl.objects.config import validate
from contentctl.objects.annotated_types import MITRE_ATTACK_ID_TYPE

# Suppress attackcti logging
logging.getLogger("taxii2client").setLevel(logging.CRITICAL)
logging.getLogger("stix2").setLevel(logging.CRITICAL)


class AttackPattern(TypedDict):
id: str
technique_id: str
technique: str
tactic: list[str]


class IntrusionSet(TypedDict):
id: str
group: str


class Relationship(TypedDict):
target_object: str
source_object: str


class AttackEnrichment(BaseModel):
Expand Down Expand Up @@ -98,11 +120,6 @@ def get_attack_lookup(
end="",
flush=True,
)
# The existence of the input_path is validated during cli argument validation, but it is
# possible that the repo is in the wrong format. If the following directories do not
# exist, then attack_client will fall back to resolving via REST API. We do not
# want this as it is slow and error prone, so we will force an exception to
# be generated.
enterprise_path = input_path / "enterprise-attack"
mobile_path = input_path / "ics-attack"
ics_path = input_path / "mobile-attack"
Expand All @@ -123,36 +140,47 @@ def get_attack_lookup(
}
)

all_enterprise_techniques = lift.get_enterprise_techniques(
stix_format=False
all_enterprise_techniques = cast(
list[AttackPattern], lift.get_enterprise_techniques(stix_format=False)
)
enterprise_relationships = lift.get_enterprise_relationships(
stix_format=False
enterprise_relationships = cast(
list[Relationship], lift.get_enterprise_relationships(stix_format=False)
)
enterprise_groups = cast(
list[IntrusionSet], lift.get_enterprise_groups(stix_format=False)
)
enterprise_groups = lift.get_enterprise_groups(stix_format=False)

for technique in all_enterprise_techniques:
apt_groups: list[dict[str, Any]] = []
for relationship in enterprise_relationships:
if (
relationship["target_object"] == technique["id"]
) and relationship["source_object"].startswith("intrusion-set"):
if relationship["target_object"] == technique[
"id"
] and relationship["source_object"].startswith("intrusion-set"):
for group in enterprise_groups:
if relationship["source_object"] == group["id"]:
apt_groups.append(group)
# apt_groups.append(group['group'])
apt_groups.append(dict(group))

tactics = []
tactics: list[MitreTactics] = []
if "tactic" in technique:
for tactic in technique["tactic"]:
tactics.append(tactic.replace("-", " ").title())

self.addMitreIDViaGroupObjects(technique, tactics, apt_groups)
attack_lookup[technique["technique_id"]] = {
"technique": technique["technique"],
"tactics": tactics,
"groups": apt_groups,
}
tactics.append(
cast(MitreTactics, tactic.replace("-", " ").title())
)

self.addMitreIDViaGroupObjects(dict(technique), tactics, apt_groups)
attack_lookup[technique["technique_id"]] = (
MitreAttackEnrichment.model_validate(
{
"mitre_attack_id": technique["technique_id"],
"mitre_attack_technique": technique["technique"],
"mitre_attack_tactics": tactics,
"mitre_attack_groups": [
group["group"] for group in apt_groups
],
"mitre_attack_group_objects": apt_groups,
}
)
)

except Exception as err:
raise Exception(f"Error getting MITRE Enrichment: {str(err)}")
Expand Down
200 changes: 172 additions & 28 deletions contentctl/output/attack_nav_output.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,197 @@
# Standard library imports
import json
import pathlib
from typing import List, Union
from datetime import datetime
from typing import Any, TypedDict

# Third-party imports
from contentctl.objects.detection import Detection
from contentctl.output.attack_nav_writer import AttackNavWriter


class TechniqueData(TypedDict):
score: int
file_paths: list[str]
links: list[dict[str, str]]


class LayerData(TypedDict):
name: str
versions: dict[str, str]
domain: str
description: str
filters: dict[str, list[str]]
sorting: int
layout: dict[str, str | bool]
hideDisabled: bool
techniques: list[dict[str, Any]]
gradient: dict[str, list[str] | int]
legendItems: list[dict[str, str]]
showTacticRowBackground: bool
tacticRowBackground: str
selectTechniquesAcrossTactics: bool
selectSubtechniquesWithParent: bool
selectVisibleTechniques: bool
metadata: list[dict[str, str]]


class AttackNavOutput:
def __init__(
self,
layer_name: str = "Splunk Detection Coverage",
layer_description: str = "MITRE ATT&CK coverage for Splunk detections",
layer_domain: str = "enterprise-attack",
):
self.layer_name = layer_name
self.layer_description = layer_description
self.layer_domain = layer_domain

def writeObjects(
self, detections: List[Detection], output_path: pathlib.Path
self, detections: list[Detection], output_path: pathlib.Path
) -> None:
techniques: dict[str, dict[str, Union[List[str], int]]] = {}
"""
Generate MITRE ATT&CK Navigator layer file from detections
Args:
detections: List of Detection objects
output_path: Path to write the layer file
"""
techniques: dict[str, TechniqueData] = {}
tactic_coverage: dict[str, set[str]] = {}

# Process each detection
for detection in detections:
if not hasattr(detection.tags, "mitre_attack_id"):
continue

for tactic in detection.tags.mitre_attack_id:
if tactic not in techniques:
techniques[tactic] = {"score": 0, "file_paths": []}
techniques[tactic] = {"score": 0, "file_paths": [], "links": []}
tactic_coverage[tactic] = set()

detection_type = detection.source
detection_id = detection.id
detection_id = str(detection.id) # Convert UUID to string
detection_url = (
f"https://research.splunk.com/{detection_type}/{detection_id}/"
)
detection_name = detection.name.replace(
"_", " "
).title() # Convert to Title Case
detection_info = f"{detection_name}"

# Store all three pieces of information separately
detection_info = f"{detection_type}|{detection_id}|{detection.name}"
techniques[tactic]["score"] += 1
techniques[tactic]["file_paths"].append(detection_info)
techniques[tactic]["links"].append(
{"label": detection_name, "url": detection_url}
)
tactic_coverage[tactic].add(detection_id)

techniques[tactic]["score"] = techniques[tactic].get("score", 0) + 1
if isinstance(techniques[tactic]["file_paths"], list):
techniques[tactic]["file_paths"].append(detection_info)
# Create the layer file
layer: LayerData = {
"name": self.layer_name,
"versions": {
"attack": "14", # Update as needed
"navigator": "5.1.0",
"layer": "4.5",
},
"domain": self.layer_domain,
"description": self.layer_description,
"filters": {
"platforms": [
"Windows",
"Linux",
"macOS",
"AWS",
"GCP",
"Azure",
"Office 365",
"SaaS",
]
},
"sorting": 0,
"layout": {
"layout": "flat",
"showName": True,
"showID": False,
"showAggregateScores": True,
"countUnscored": True,
"aggregateFunction": "average",
"expandedSubtechniques": "none",
},
"hideDisabled": False,
"techniques": [
{
"techniqueID": tid,
"score": data["score"],
"metadata": [
{"name": "Detection", "value": name, "divider": False}
for name in data["file_paths"]
]
+ [
{
"name": "Link",
"value": f"[View Detection]({link['url']})",
"divider": False,
}
for link in data["links"]
],
"links": [
{"label": link["label"], "url": link["url"]}
for link in data["links"]
],
}
for tid, data in techniques.items()
],
"gradient": {
"colors": [
"#1a365d", # Dark blue
"#2c5282", # Medium blue
"#4299e1", # Light blue
"#48bb78", # Light green
"#38a169", # Medium green
"#276749", # Dark green
],
"minValue": 0,
"maxValue": 5, # Adjust based on your max detections per technique
},
"legendItems": [
{"label": "1 Detection", "color": "#1a365d"},
{"label": "2 Detections", "color": "#4299e1"},
{"label": "3 Detections", "color": "#48bb78"},
{"label": "4+ Detections", "color": "#276749"},
],
"showTacticRowBackground": True,
"tacticRowBackground": "#dddddd",
"selectTechniquesAcrossTactics": True,
"selectSubtechniquesWithParent": True,
"selectVisibleTechniques": False,
"metadata": [
{"name": "Generated", "value": datetime.now().isoformat()},
{"name": "Total Detections", "value": str(len(detections))},
{"name": "Covered Techniques", "value": str(len(techniques))},
],
}

"""
for detection in objects:
if detection.tags.mitre_attack_enrichments:
for mitre_attack_enrichment in detection.tags.mitre_attack_enrichments:
if not mitre_attack_enrichment.mitre_attack_id in techniques:
techniques[mitre_attack_enrichment.mitre_attack_id] = {
'score': 1,
'file_paths': ['https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name)]
}
else:
techniques[mitre_attack_enrichment.mitre_attack_id]['score'] = techniques[mitre_attack_enrichment.mitre_attack_id]['score'] + 1
techniques[mitre_attack_enrichment.mitre_attack_id]['file_paths'].append('https://github.com/splunk/security_content/blob/develop/detections/' + detection.getSource() + '/' + self.convertNameToFileName(detection.name))
"""
AttackNavWriter.writeAttackNavFile(techniques, output_path / "coverage.json")
# Write the layer file
output_file = output_path / "coverage.json"
with open(output_file, "w") as f:
json.dump(layer, f, indent=2)

print(f"\n✅ MITRE ATT&CK Navigator layer file written to: {output_file}")
print("📊 Coverage Summary:")
print(f" Total Detections: {len(detections)}")
print(f" Covered Techniques: {len(techniques)}")
print(f" Tactics with Coverage: {len(tactic_coverage)}")
print("\n🗺️ To view the layer:")
print(" 1. Go to https://mitre-attack.github.io/attack-navigator/")
print(" 2. Click 'Open Existing Layer'")
print(f" 3. Select the file: {output_file}")

def convertNameToFileName(self, name: str):
def convertNameToFileName(self, name: str) -> str:
"""Convert a detection name to a valid filename"""
file_name = (
name.replace(" ", "_")
.replace("-", "_")
.replace(".", "_")
.replace("/", "_")
.lower()
)
file_name = file_name + ".yml"
return file_name
return f"{file_name}.yml"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "contentctl"

version = "5.5.3"
version = "5.5.4"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumped patch version in prep for release.

Also, pandas dependency was removed as it is not required.


description = "Splunk Content Control Tool"
authors = ["STRT <research@splunk.com>"]
Expand Down
Loading