From aca2b3d1c6cc9761d8bda350f36d1a5142613607 Mon Sep 17 00:00:00 2001 From: Casey McGinley Date: Thu, 27 Mar 2025 10:49:49 -0700 Subject: [PATCH 1/3] adding risk data model validation --- contentctl/objects/base_security_event.py | 28 +++ contentctl/objects/correlation_search.py | 202 ++++++++++++++++++---- contentctl/objects/notable_event.py | 21 +-- contentctl/objects/risk_event.py | 22 +-- 4 files changed, 208 insertions(+), 65 deletions(-) create mode 100644 contentctl/objects/base_security_event.py diff --git a/contentctl/objects/base_security_event.py b/contentctl/objects/base_security_event.py new file mode 100644 index 00000000..090659d0 --- /dev/null +++ b/contentctl/objects/base_security_event.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod + +from pydantic import BaseModel, ConfigDict + +from contentctl.objects.detection import Detection + + +class BaseSecurityEvent(BaseModel, ABC): + """ + Base event class for a Splunk security event (e.g. risks and notables) + """ + + # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") + search_name: str + + # The search ID that found that generated this event + orig_sid: str + + # Allowing fields that aren't explicitly defined to be passed since some of the risk/notable + # event's fields vary depending on the SPL which generated them + model_config = ConfigDict(extra="allow") + + @abstractmethod + def validate_against_detection(self, detection: Detection) -> None: + """ + Validate this risk/notable event against the given detection + """ + raise NotImplementedError() diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 0cebf5cc..f5544e23 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -1,38 +1,38 @@ +import json import logging import time -import json -from typing import Any -from enum import StrEnum, IntEnum +from enum import IntEnum, StrEnum from functools import cached_property +from typing import Any -from pydantic import ConfigDict, BaseModel, computed_field, Field, PrivateAttr -from splunklib.results import JSONResultsReader, Message # type: ignore -from splunklib.binding import HTTPError, ResponseReader # type: ignore import splunklib.client as splunklib # type: ignore +from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, computed_field +from splunklib.binding import HTTPError, ResponseReader # type: ignore +from splunklib.results import JSONResultsReader, Message # type: ignore from tqdm import tqdm # type: ignore -from contentctl.objects.risk_analysis_action import RiskAnalysisAction -from contentctl.objects.notable_action import NotableAction -from contentctl.objects.base_test_result import TestResultStatus -from contentctl.objects.integration_test_result import IntegrationTestResult from contentctl.actions.detection_testing.progress_bar import ( - format_pbar_string, # type: ignore - TestReportingType, TestingStates, + TestReportingType, + format_pbar_string, # type: ignore ) +from contentctl.objects.base_security_event import BaseSecurityEvent +from contentctl.objects.base_test_result import TestResultStatus +from contentctl.objects.detection import Detection from contentctl.objects.errors import ( + ClientError, IntegrationTestingError, ServerError, - ClientError, ValidationFailed, ) -from contentctl.objects.detection import Detection -from contentctl.objects.risk_event import RiskEvent +from contentctl.objects.integration_test_result import IntegrationTestResult +from contentctl.objects.notable_action import NotableAction from contentctl.objects.notable_event import NotableEvent - +from contentctl.objects.risk_analysis_action import RiskAnalysisAction +from contentctl.objects.risk_event import RiskEvent # Suppress logging by default; enable for local testing -ENABLE_LOGGING = False +ENABLE_LOGGING = True LOG_LEVEL = logging.DEBUG LOG_PATH = "correlation_search.log" @@ -232,6 +232,9 @@ class CorrelationSearch(BaseModel): # The list of risk events found _risk_events: list[RiskEvent] | None = PrivateAttr(default=None) + # The list of risk data model events found + _risk_dm_events: list[BaseSecurityEvent] | None = PrivateAttr(default=None) + # The list of notable events found _notable_events: list[NotableEvent] | None = PrivateAttr(default=None) @@ -519,6 +522,9 @@ def risk_event_exists(self) -> bool: events = self.get_risk_events(force_update=True) return len(events) > 0 + # TODO (cmcginley): to minimize number of queries, perhaps filter these events from the + # returned risk dm events? --> I think no; we want to validate product behavior; we should + # instead compare the risk dm and the risk index (maybe...) def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: """Get risk events from the Splunk instance @@ -551,6 +557,8 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: events: list[RiskEvent] = [] try: for result in result_iterator: + # TODO (cmcginley): Do we need an else condition here for when the index is + # anything other than expected? # sanity check that this result from the iterator is a risk event and not some # other metadata if result["index"] == Indexes.RISK_INDEX: @@ -647,15 +655,116 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: return events + def risk_dm_event_exists(self) -> bool: + """Whether at least one matching risk data model event exists + + Queries the `risk` data model and returns True if at least one matching event (could come + from risk or notable index) exists for this search + :return: a bool indicating whether a risk data model event for this search exists in the + risk data model + """ + # We always force an update on the cache when checking if events exist + events = self.get_risk_dm_events(force_update=True) + return len(events) > 0 + + def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEvent]: + """Get risk data model events from the Splunk instance + + Queries the `risk` data model and returns any matching events (could come from risk or + notable index) + :param force_update: whether the cached _risk_events should be forcibly updated if already + set + :return: a list of risk events + """ + # Reset the list of risk data model events if we're forcing an update + if force_update: + self.logger.debug("Resetting risk data model event cache.") + self._risk_dm_events = None + + # Use the cached risk_dm_events unless we're forcing an update + if self._risk_dm_events is not None: + self.logger.debug( + f"Using cached risk data model events ({len(self._risk_dm_events)} total)." + ) + return self._risk_dm_events + + # TODO (cmcginley): optimize this query? don't REALLY need the full events here for the + # depth of validation we're doing -> really just need the index + # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID + # Search for all risk data model events from a single scheduled search (indicated by + # orig_sid) + query = ( + f'datamodel Risk All_Risk flat | search search_name="{self.name}" [datamodel Risk ' + f'All_Risk flat | search search_name="{self.name}" | tail 1 | fields orig_sid] ' + "| tojson" + ) + result_iterator = self._search(query) + + # TODO (cmcginley): make parent structure for risk and notabel events for shared fields (** START HERE **) + # TODO (cmcginley): make new structure for risk DM events? parent structure for risk/notable events? + # Iterate over the events, storing them in a list and checking for any errors + events: list[BaseSecurityEvent] = [] + risk_count = 0 + notable_count = 0 + try: + for result in result_iterator: + # sanity check that this result from the iterator is a risk event and not some + # other metadata + if result["index"] == Indexes.RISK_INDEX: + try: + parsed_raw = json.loads(result["_raw"]) + event = RiskEvent.model_validate(parsed_raw) + except Exception: + self.logger.error( + f"Failed to parse RiskEvent from search result: {result}" + ) + raise + events.append(event) + risk_count += 1 + self.logger.debug( + f"Found risk event in risk data model for '{self.name}': {event}" + ) + elif result["index"] == Indexes.NOTABLE_INDEX: + try: + parsed_raw = json.loads(result["_raw"]) + event = NotableEvent.model_validate(parsed_raw) + except Exception: + self.logger.error( + f"Failed to parse NotableEvent from search result: {result}" + ) + raise + events.append(event) + notable_count += 1 + self.logger.debug( + f"Found notable event in risk data model for '{self.name}': {event}" + ) + except ServerError as e: + self.logger.error(f"Error returned from Splunk instance: {e}") + raise e + + # Log if no events were found + if len(events) < 1: + self.logger.debug(f"No events found in risk data model for '{self.name}'") + else: + # Set the cache if we found events + self._risk_dm_events = events + self.logger.debug( + f"Caching {len(self._risk_dm_events)} risk data model events." + ) + + # Log counts of risk and notable events found + self.logger.debug( + f"Found {risk_count} risk events and {notable_count} notable events in the risk data " + "model" + ) + + return events + def validate_risk_events(self) -> None: """Validates the existence of any expected risk events First ensure the risk event exists, and if it does validate its risk message and make sure - any events align with the specified risk object. Also adds the risk index to the purge list - if risk events existed - :param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to - check the risks/notables - :returns: an IntegrationTestResult on failure; None on success + any events align with the specified risk object. """ # Ensure the rba object is defined if self.detection.rba is None: @@ -745,13 +854,33 @@ def validate_risk_events(self) -> None: def validate_notable_events(self) -> None: """Validates the existence of any expected notables - Ensures the notable exists. Also adds the notable index to the purge list if notables - existed - :param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to - check the risks/notables - :returns: an IntegrationTestResult on failure; None on success + Check various fields within the notable to ensure alignment with the detection definition. + Additionally, ensure that the notable does not appear in the risk data model, as this is + currently undesired behavior for ESCU detections. + """ + if self.notable_in_risk_dm(): + raise ValidationFailed( + "One or more notables appeared in the risk data model. This could lead to risk " + "score doubling, and/or notable multiplexing, depending on the detection type " + "(e.g. TTP), or the number of risk modifiers." + ) + + # TODO (cmcginley): implement... Should this maybe be baked into the notable validation + # routine? since we are returning an integration test result; I think yes; get the risk dm + # events directly in the notable validation routine and ensure no notables are found in the + # data model + def notable_in_risk_dm(self) -> bool: + """Check if notables are in the risk data model + + Returns a bool indicating whether notables are in the risk data model or not. + + :returns: a bool, True if notables are in the risk data model results; False if not """ - raise NotImplementedError() + if self.risk_dm_event_exists(): + for event in self.get_risk_dm_events(): + if isinstance(event, NotableEvent): + return True + return False # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky @@ -838,8 +967,8 @@ def test( try: # Validate risk events - self.logger.debug("Checking for matching risk events") if self.has_risk_analysis_action: + self.logger.debug("Checking for matching risk events") if self.risk_event_exists(): # TODO (PEX-435): should this in the retry loop? or outside it? # -> I've observed there being a missing risk event (15/16) on @@ -856,22 +985,28 @@ def test( raise ValidationFailed( f"TEST FAILED: No matching risk event created for: {self.name}" ) + else: + self.logger.debug( + f"No risk action defined for '{self.name}'" + ) # Validate notable events - self.logger.debug("Checking for matching notable events") if self.has_notable_action: + self.logger.debug("Checking for matching notable events") # NOTE: because we check this last, if both fail, the error message about notables will # always be the last to be added and thus the one surfaced to the user if self.notable_event_exists(): # TODO (PEX-435): should this in the retry loop? or outside it? - # TODO (PEX-434): implement deeper notable validation (the method - # commented out below is unimplemented) - # self.validate_notable_events(elapsed_sleep_time) + self.validate_notable_events() pass else: raise ValidationFailed( f"TEST FAILED: No matching notable event created for: {self.name}" ) + else: + self.logger.debug( + f"No notable action defined for '{self.name}'" + ) except ValidationFailed as e: self.logger.error(f"Risk/notable validation failed: {e}") result = IntegrationTestResult( @@ -1025,6 +1160,7 @@ def cleanup(self, delete_test_index: bool = False) -> None: # reset caches self._risk_events = None self._notable_events = None + self._risk_dm_events = None def update_pbar(self, state: str) -> str: """ diff --git a/contentctl/objects/notable_event.py b/contentctl/objects/notable_event.py index 31f8a959..f43d47db 100644 --- a/contentctl/objects/notable_event.py +++ b/contentctl/objects/notable_event.py @@ -1,19 +1,14 @@ -from pydantic import ConfigDict, BaseModel - +from contentctl.objects.base_security_event import BaseSecurityEvent from contentctl.objects.detection import Detection -# TODO (PEX-434): implement deeper notable validation -class NotableEvent(BaseModel): - # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") - search_name: str - - # The search ID that found that generated this risk event - orig_sid: str - - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - model_config = ConfigDict(extra="allow") +class NotableEvent(BaseSecurityEvent): + # TODO (PEX-434): implement deeper notable validation + # TODO (cmcginley): do I need to define the abstractmethods? + pass def validate_against_detection(self, detection: Detection) -> None: + """ + Validate this risk/notable event against the given detection + """ raise NotImplementedError() diff --git a/contentctl/objects/risk_event.py b/contentctl/objects/risk_event.py index c2460891..e6fae253 100644 --- a/contentctl/objects/risk_event.py +++ b/contentctl/objects/risk_event.py @@ -1,26 +1,17 @@ import re from functools import cached_property -from pydantic import ( - BaseModel, - ConfigDict, - Field, - PrivateAttr, - computed_field, - field_validator, -) +from pydantic import Field, PrivateAttr, computed_field, field_validator +from contentctl.objects.base_security_event import BaseSecurityEvent from contentctl.objects.detection import Detection from contentctl.objects.errors import ValidationFailed from contentctl.objects.rba import RiskObject -class RiskEvent(BaseModel): +class RiskEvent(BaseSecurityEvent): """Model for risk event in ES""" - # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") - search_name: str - # The subject of the risk event (e.g. a username, process name, system name, account ID, etc.) # (not to be confused w/ the risk object from the detection) es_risk_object: int | str = Field(alias="risk_object") @@ -32,9 +23,6 @@ class RiskEvent(BaseModel): # The level of risk associated w/ the risk event risk_score: int - # The search ID that found that generated this risk event - orig_sid: str - # The message for the risk event risk_message: str @@ -53,10 +41,6 @@ class RiskEvent(BaseModel): # Private attribute caching the risk object this RiskEvent is mapped to _matched_risk_object: RiskObject | None = PrivateAttr(default=None) - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - model_config = ConfigDict(extra="allow") - @field_validator("annotations_mitre_attack", "analyticstories", mode="before") @classmethod def _convert_str_value_to_singleton(cls, v: str | list[str]) -> list[str]: From 300e87fdffbe9dc8e0e734368053e25c449a6c75 Mon Sep 17 00:00:00 2001 From: Casey McGinley Date: Thu, 27 Mar 2025 12:15:14 -0700 Subject: [PATCH 2/3] removing some TODOs and adding better logging --- contentctl/objects/correlation_search.py | 34 +++++++++++++++--------- contentctl/objects/notable_event.py | 2 -- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index b0ba188f..cbb85461 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -513,9 +513,6 @@ def risk_event_exists(self) -> bool: events = self.get_risk_events(force_update=True) return len(events) > 0 - # TODO (cmcginley): to minimize number of queries, perhaps filter these events from the - # returned risk dm events? --> I think no; we want to validate product behavior; we should - # instead compare the risk dm and the risk index (maybe...) def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: """Get risk events from the Splunk instance @@ -548,8 +545,6 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: events: list[RiskEvent] = [] try: for result in result_iterator: - # TODO (cmcginley): Do we need an else condition here for when the index is - # anything other than expected? # sanity check that this result from the iterator is a risk event and not some # other metadata if result["index"] == Indexes.RISK_INDEX: @@ -563,6 +558,13 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: raise events.append(event) self.logger.debug(f"Found risk event for '{self.name}': {event}") + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.RISK_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) except ServerError as e: self.logger.error(f"Error returned from Splunk instance: {e}") raise e @@ -632,6 +634,13 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: raise events.append(event) self.logger.debug(f"Found notable event for '{self.name}': {event}") + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.NOTABLE_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) except ServerError as e: self.logger.error(f"Error returned from Splunk instance: {e}") raise e @@ -679,8 +688,6 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve ) return self._risk_dm_events - # TODO (cmcginley): optimize this query? don't REALLY need the full events here for the - # depth of validation we're doing -> really just need the index # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID # Search for all risk data model events from a single scheduled search (indicated by # orig_sid) @@ -691,8 +698,6 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve ) result_iterator = self._search(query) - # TODO (cmcginley): make parent structure for risk and notabel events for shared fields (** START HERE **) - # TODO (cmcginley): make new structure for risk DM events? parent structure for risk/notable events? # Iterate over the events, storing them in a list and checking for any errors events: list[BaseSecurityEvent] = [] risk_count = 0 @@ -729,6 +734,13 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve self.logger.debug( f"Found notable event in risk data model for '{self.name}': {event}" ) + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.NOTABLE_INDEX} or {Indexes.RISK_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) except ServerError as e: self.logger.error(f"Error returned from Splunk instance: {e}") raise e @@ -856,10 +868,6 @@ def validate_notable_events(self) -> None: "(e.g. TTP), or the number of risk modifiers." ) - # TODO (cmcginley): implement... Should this maybe be baked into the notable validation - # routine? since we are returning an integration test result; I think yes; get the risk dm - # events directly in the notable validation routine and ensure no notables are found in the - # data model def notable_in_risk_dm(self) -> bool: """Check if notables are in the risk data model diff --git a/contentctl/objects/notable_event.py b/contentctl/objects/notable_event.py index f43d47db..c863eaa3 100644 --- a/contentctl/objects/notable_event.py +++ b/contentctl/objects/notable_event.py @@ -4,8 +4,6 @@ class NotableEvent(BaseSecurityEvent): # TODO (PEX-434): implement deeper notable validation - # TODO (cmcginley): do I need to define the abstractmethods? - pass def validate_against_detection(self, detection: Detection) -> None: """ From 60bef93a1805859115d40229b16c8ce07edd0ec8 Mon Sep 17 00:00:00 2001 From: pyth0n1c <87383215+pyth0n1c@users.noreply.github.com> Date: Tue, 29 Apr 2025 16:18:12 -0700 Subject: [PATCH 3/3] Disable logging before merging. Also bump contentctl version for new release --- contentctl/objects/correlation_search.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index cbb85461..32643c68 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -34,7 +34,7 @@ from contentctl.objects.risk_event import RiskEvent # Suppress logging by default; enable for local testing -ENABLE_LOGGING = True +ENABLE_LOGGING = False LOG_LEVEL = logging.DEBUG LOG_PATH = "correlation_search.log" diff --git a/pyproject.toml b/pyproject.toml index 0b97931f..c08f725d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "contentctl" -version = "5.3.2" +version = "5.4.0" description = "Splunk Content Control Tool" authors = ["STRT "]