diff --git a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py index 5f9fbdae..0c4a7098 100644 --- a/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py +++ b/contentctl/actions/detection_testing/infrastructures/DetectionTestingInfrastructure.py @@ -7,7 +7,6 @@ import time import urllib.parse import uuid -from shutil import copyfile from ssl import SSLEOFError, SSLZeroReturnError from sys import stdout from tempfile import TemporaryDirectory, mktemp @@ -1402,30 +1401,24 @@ def replay_attack_data_file( f"The only valid indexes on the server are {self.all_indexes_on_server}" ) - tempfile = mktemp(dir=tmp_dir) - if not ( - str(attack_data_file.data).startswith("http://") - or str(attack_data_file.data).startswith("https://") - ): - if pathlib.Path(str(attack_data_file.data)).is_file(): - self.format_pbar_string( - TestReportingType.GROUP, - test_group.name, - "Copying Data", - test_group_start_time, + # Runtime check to see if the attack_data repo exists. If so, check for the existence of the + # attack_data file(s) on disk. If it exists, use those files rather than a download of the file + if str(attack_data_file.data).startswith("https://"): + new_data_file = pathlib.Path( + str(attack_data_file.data).replace( + "https://media.githubusercontent.com/media/splunk/attack_data/master", + str(self.global_config.splunk_attack_data_path), ) + ) + if new_data_file.exists(): + attack_data_file.data = new_data_file - try: - copyfile(str(attack_data_file.data), tempfile) - except Exception as e: - raise Exception( - f"Error copying local Attack Data File for [{test_group.name}] - [{attack_data_file.data}]: " - f"{str(e)}" - ) - else: - raise Exception( - f"Attack Data File for [{test_group.name}] is local [{attack_data_file.data}], but does not exist." - ) + if not ( + str(attack_data_file.data).startswith("https://") + or str(attack_data_file.data).startswith("http://") + ): + # This is a file on the filesystem, so no need to download it + target_file = str(attack_data_file.data) else: # Download the file @@ -1441,9 +1434,11 @@ def replay_attack_data_file( start_time=test_group_start_time, ) + tempfile = mktemp(dir=tmp_dir) Utils.download_file_from_http( str(attack_data_file.data), tempfile, self.pbar, overwrite_file=True ) + target_file = tempfile except Exception as e: raise ( Exception( @@ -1459,7 +1454,7 @@ def replay_attack_data_file( start_time=test_group_start_time, ) - self.hec_raw_replay(tempfile, attack_data_file) + self.hec_raw_replay(target_file, attack_data_file) return attack_data_file.custom_index or self.sync_obj.replay_index diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 5ff88cd4..fcc78381 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -200,6 +200,7 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: context={ "output_dto": self.output_dto, "app": self.input_dto.app, + "config": self.input_dto, }, ) self.output_dto.addContentToDictMappings(detection) diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index e4149fea..5d00c9b0 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -289,6 +289,10 @@ def mitre_cti_repo_path(self) -> pathlib.Path: def atomic_red_team_repo_path(self): return self.external_repos_path / "atomic-red-team" + @property + def splunk_attack_data_path(self): + return self.external_repos_path / "attack_data" + @model_validator(mode="after") def ensureEnrichmentReposPresent(self) -> Self: """ diff --git a/contentctl/objects/test_attack_data.py b/contentctl/objects/test_attack_data.py index 5d5f9c80..1c567cd0 100644 --- a/contentctl/objects/test_attack_data.py +++ b/contentctl/objects/test_attack_data.py @@ -1,5 +1,14 @@ from __future__ import annotations -from pydantic import BaseModel, HttpUrl, FilePath, Field, ConfigDict + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + FilePath, + HttpUrl, + ValidationInfo, + field_validator, +) class TestAttackData(BaseModel): @@ -11,3 +20,29 @@ class TestAttackData(BaseModel): sourcetype: str = Field(...) custom_index: str | None = None host: str | None = None + + @field_validator("data", mode="before") + @classmethod + def check_for_existence_of_attack_data_repo( + cls, value: HttpUrl | FilePath, info: ValidationInfo + ) -> HttpUrl | FilePath: + if info.context and info.context.get("config", None): + from contentctl.objects.config import validate + + config: validate = info.context.get("config", None) + + # trim off the beginning of the attack data url + STARTING = ( + "https://media.githubusercontent.com/media/splunk/attack_data/master/" + ) + if str(value).startswith(STARTING): + new_path = config.splunk_attack_data_path / str(value).replace( + STARTING, "" + ) + if new_path.is_file(): + # print("\ngreat we mapped the new path!") + + return new_path + else: + print(f"\n no new path :( - {value}") + return value