diff --git a/contentctl/objects/base_security_event.py b/contentctl/objects/base_security_event.py new file mode 100644 index 00000000..090659d0 --- /dev/null +++ b/contentctl/objects/base_security_event.py @@ -0,0 +1,28 @@ +from abc import ABC, abstractmethod + +from pydantic import BaseModel, ConfigDict + +from contentctl.objects.detection import Detection + + +class BaseSecurityEvent(BaseModel, ABC): + """ + Base event class for a Splunk security event (e.g. risks and notables) + """ + + # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") + search_name: str + + # The search ID that found that generated this event + orig_sid: str + + # Allowing fields that aren't explicitly defined to be passed since some of the risk/notable + # event's fields vary depending on the SPL which generated them + model_config = ConfigDict(extra="allow") + + @abstractmethod + def validate_against_detection(self, detection: Detection) -> None: + """ + Validate this risk/notable event against the given detection + """ + raise NotImplementedError() diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 4306cc8e..32643c68 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -18,6 +18,7 @@ format_pbar_string, # type: ignore ) from contentctl.helper.utils import Utils +from contentctl.objects.base_security_event import BaseSecurityEvent from contentctl.objects.base_test_result import TestResultStatus from contentctl.objects.detection import Detection from contentctl.objects.errors import ( @@ -222,6 +223,9 @@ class CorrelationSearch(BaseModel): # The list of risk events found _risk_events: list[RiskEvent] | None = PrivateAttr(default=None) + # The list of risk data model events found + _risk_dm_events: list[BaseSecurityEvent] | None = PrivateAttr(default=None) + # The list of notable events found _notable_events: list[NotableEvent] | None = PrivateAttr(default=None) @@ -554,6 +558,13 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: raise events.append(event) self.logger.debug(f"Found risk event for '{self.name}': {event}") + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.RISK_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) except ServerError as e: self.logger.error(f"Error returned from Splunk instance: {e}") raise e @@ -623,6 +634,13 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: raise events.append(event) self.logger.debug(f"Found notable event for '{self.name}': {event}") + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.NOTABLE_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) except ServerError as e: self.logger.error(f"Error returned from Splunk instance: {e}") raise e @@ -637,15 +655,119 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: return events + def risk_dm_event_exists(self) -> bool: + """Whether at least one matching risk data model event exists + + Queries the `risk` data model and returns True if at least one matching event (could come + from risk or notable index) exists for this search + :return: a bool indicating whether a risk data model event for this search exists in the + risk data model + """ + # We always force an update on the cache when checking if events exist + events = self.get_risk_dm_events(force_update=True) + return len(events) > 0 + + def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEvent]: + """Get risk data model events from the Splunk instance + + Queries the `risk` data model and returns any matching events (could come from risk or + notable index) + :param force_update: whether the cached _risk_events should be forcibly updated if already + set + :return: a list of risk events + """ + # Reset the list of risk data model events if we're forcing an update + if force_update: + self.logger.debug("Resetting risk data model event cache.") + self._risk_dm_events = None + + # Use the cached risk_dm_events unless we're forcing an update + if self._risk_dm_events is not None: + self.logger.debug( + f"Using cached risk data model events ({len(self._risk_dm_events)} total)." + ) + return self._risk_dm_events + + # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID + # Search for all risk data model events from a single scheduled search (indicated by + # orig_sid) + query = ( + f'datamodel Risk All_Risk flat | search search_name="{self.name}" [datamodel Risk ' + f'All_Risk flat | search search_name="{self.name}" | tail 1 | fields orig_sid] ' + "| tojson" + ) + result_iterator = self._search(query) + + # Iterate over the events, storing them in a list and checking for any errors + events: list[BaseSecurityEvent] = [] + risk_count = 0 + notable_count = 0 + try: + for result in result_iterator: + # sanity check that this result from the iterator is a risk event and not some + # other metadata + if result["index"] == Indexes.RISK_INDEX: + try: + parsed_raw = json.loads(result["_raw"]) + event = RiskEvent.model_validate(parsed_raw) + except Exception: + self.logger.error( + f"Failed to parse RiskEvent from search result: {result}" + ) + raise + events.append(event) + risk_count += 1 + self.logger.debug( + f"Found risk event in risk data model for '{self.name}': {event}" + ) + elif result["index"] == Indexes.NOTABLE_INDEX: + try: + parsed_raw = json.loads(result["_raw"]) + event = NotableEvent.model_validate(parsed_raw) + except Exception: + self.logger.error( + f"Failed to parse NotableEvent from search result: {result}" + ) + raise + events.append(event) + notable_count += 1 + self.logger.debug( + f"Found notable event in risk data model for '{self.name}': {event}" + ) + else: + msg = ( + f"Found event for unexpected index ({result['index']}) in our query " + f"results (expected {Indexes.NOTABLE_INDEX} or {Indexes.RISK_INDEX})" + ) + self.logger.error(msg) + raise ValueError(msg) + except ServerError as e: + self.logger.error(f"Error returned from Splunk instance: {e}") + raise e + + # Log if no events were found + if len(events) < 1: + self.logger.debug(f"No events found in risk data model for '{self.name}'") + else: + # Set the cache if we found events + self._risk_dm_events = events + self.logger.debug( + f"Caching {len(self._risk_dm_events)} risk data model events." + ) + + # Log counts of risk and notable events found + self.logger.debug( + f"Found {risk_count} risk events and {notable_count} notable events in the risk data " + "model" + ) + + return events + def validate_risk_events(self) -> None: """Validates the existence of any expected risk events First ensure the risk event exists, and if it does validate its risk message and make sure - any events align with the specified risk object. Also adds the risk index to the purge list - if risk events existed - :param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to - check the risks/notables - :returns: an IntegrationTestResult on failure; None on success + any events align with the specified risk object. """ # Ensure the rba object is defined if self.detection.rba is None: @@ -735,13 +857,29 @@ def validate_risk_events(self) -> None: def validate_notable_events(self) -> None: """Validates the existence of any expected notables - Ensures the notable exists. Also adds the notable index to the purge list if notables - existed - :param elapsed_sleep_time: an int representing the amount of time slept thus far waiting to - check the risks/notables - :returns: an IntegrationTestResult on failure; None on success + Check various fields within the notable to ensure alignment with the detection definition. + Additionally, ensure that the notable does not appear in the risk data model, as this is + currently undesired behavior for ESCU detections. + """ + if self.notable_in_risk_dm(): + raise ValidationFailed( + "One or more notables appeared in the risk data model. This could lead to risk " + "score doubling, and/or notable multiplexing, depending on the detection type " + "(e.g. TTP), or the number of risk modifiers." + ) + + def notable_in_risk_dm(self) -> bool: + """Check if notables are in the risk data model + + Returns a bool indicating whether notables are in the risk data model or not. + + :returns: a bool, True if notables are in the risk data model results; False if not """ - raise NotImplementedError() + if self.risk_dm_event_exists(): + for event in self.get_risk_dm_events(): + if isinstance(event, NotableEvent): + return True + return False # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky @@ -828,8 +966,8 @@ def test( try: # Validate risk events - self.logger.debug("Checking for matching risk events") if self.has_risk_analysis_action: + self.logger.debug("Checking for matching risk events") if self.risk_event_exists(): # TODO (PEX-435): should this in the retry loop? or outside it? # -> I've observed there being a missing risk event (15/16) on @@ -846,22 +984,28 @@ def test( raise ValidationFailed( f"TEST FAILED: No matching risk event created for: {self.name}" ) + else: + self.logger.debug( + f"No risk action defined for '{self.name}'" + ) # Validate notable events - self.logger.debug("Checking for matching notable events") if self.has_notable_action: + self.logger.debug("Checking for matching notable events") # NOTE: because we check this last, if both fail, the error message about notables will # always be the last to be added and thus the one surfaced to the user if self.notable_event_exists(): # TODO (PEX-435): should this in the retry loop? or outside it? - # TODO (PEX-434): implement deeper notable validation (the method - # commented out below is unimplemented) - # self.validate_notable_events(elapsed_sleep_time) + self.validate_notable_events() pass else: raise ValidationFailed( f"TEST FAILED: No matching notable event created for: {self.name}" ) + else: + self.logger.debug( + f"No notable action defined for '{self.name}'" + ) except ValidationFailed as e: self.logger.error(f"Risk/notable validation failed: {e}") result = IntegrationTestResult( @@ -1015,6 +1159,7 @@ def cleanup(self, delete_test_index: bool = False) -> None: # reset caches self._risk_events = None self._notable_events = None + self._risk_dm_events = None def update_pbar(self, state: str) -> str: """ diff --git a/contentctl/objects/notable_event.py b/contentctl/objects/notable_event.py index 31f8a959..c863eaa3 100644 --- a/contentctl/objects/notable_event.py +++ b/contentctl/objects/notable_event.py @@ -1,19 +1,12 @@ -from pydantic import ConfigDict, BaseModel - +from contentctl.objects.base_security_event import BaseSecurityEvent from contentctl.objects.detection import Detection -# TODO (PEX-434): implement deeper notable validation -class NotableEvent(BaseModel): - # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") - search_name: str - - # The search ID that found that generated this risk event - orig_sid: str - - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - model_config = ConfigDict(extra="allow") +class NotableEvent(BaseSecurityEvent): + # TODO (PEX-434): implement deeper notable validation def validate_against_detection(self, detection: Detection) -> None: + """ + Validate this risk/notable event against the given detection + """ raise NotImplementedError() diff --git a/contentctl/objects/risk_event.py b/contentctl/objects/risk_event.py index c2460891..e6fae253 100644 --- a/contentctl/objects/risk_event.py +++ b/contentctl/objects/risk_event.py @@ -1,26 +1,17 @@ import re from functools import cached_property -from pydantic import ( - BaseModel, - ConfigDict, - Field, - PrivateAttr, - computed_field, - field_validator, -) +from pydantic import Field, PrivateAttr, computed_field, field_validator +from contentctl.objects.base_security_event import BaseSecurityEvent from contentctl.objects.detection import Detection from contentctl.objects.errors import ValidationFailed from contentctl.objects.rba import RiskObject -class RiskEvent(BaseModel): +class RiskEvent(BaseSecurityEvent): """Model for risk event in ES""" - # The search name (e.g. "ESCU - Windows Modify Registry EnableLinkedConnections - Rule") - search_name: str - # The subject of the risk event (e.g. a username, process name, system name, account ID, etc.) # (not to be confused w/ the risk object from the detection) es_risk_object: int | str = Field(alias="risk_object") @@ -32,9 +23,6 @@ class RiskEvent(BaseModel): # The level of risk associated w/ the risk event risk_score: int - # The search ID that found that generated this risk event - orig_sid: str - # The message for the risk event risk_message: str @@ -53,10 +41,6 @@ class RiskEvent(BaseModel): # Private attribute caching the risk object this RiskEvent is mapped to _matched_risk_object: RiskObject | None = PrivateAttr(default=None) - # Allowing fields that aren't explicitly defined to be passed since some of the risk event's - # fields vary depending on the SPL which generated them - model_config = ConfigDict(extra="allow") - @field_validator("annotations_mitre_attack", "analyticstories", mode="before") @classmethod def _convert_str_value_to_singleton(cls, v: str | list[str]) -> list[str]: diff --git a/pyproject.toml b/pyproject.toml index 0b97931f..c08f725d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "contentctl" -version = "5.3.2" +version = "5.4.0" description = "Splunk Content Control Tool" authors = ["STRT "]