From b1aa8b9177e47cdd4d6d6836ba43ca4be94e54cb Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Fri, 13 Jun 2025 09:53:12 -0700 Subject: [PATCH 01/18] Use dispatch() to trigger savedsearch --- contentctl/objects/correlation_search.py | 94 ++++++++++++++++++------ 1 file changed, 73 insertions(+), 21 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 32643c68..6e6c790b 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -34,7 +34,7 @@ from contentctl.objects.risk_event import RiskEvent # Suppress logging by default; enable for local testing -ENABLE_LOGGING = False +ENABLE_LOGGING = True LOG_LEVEL = logging.DEBUG LOG_PATH = "correlation_search.log" @@ -88,7 +88,7 @@ class ScheduleConfig(StrEnum): EARLIEST_TIME = "-5y@y" LATEST_TIME = "-1m@m" - CRON_SCHEDULE = "*/1 * * * *" + CRON_SCHEDULE = "0 0 1 1 *" class ResultIterator: @@ -437,6 +437,18 @@ def enable(self, refresh: bool = True) -> None: if refresh: self.refresh() + def dispatch(self) -> splunklib.Job: + """Dispatches the SavedSearch + + Dispatches the SavedSearch entity, returning a Job object representing the search job. + :return: a splunklib.Job object representing the search job + """ + self.logger.debug(f"Dispatching {self.name}...") + try: + return self.saved_search.dispatch(trigger_actions=True) # type: ignore + except HTTPError as e: + raise ServerError(f"HTTP error encountered while dispatching detection: {e}") + def disable(self, refresh: bool = True) -> None: """Disables the SavedSearch @@ -496,6 +508,18 @@ def force_run(self, refresh: bool = True) -> None: self.update_timeframe(refresh=False) if not self.enabled: self.enable(refresh=False) + job = self.dispatch() + self.logger.info(f"Force running detection '{self.name}' with job ID: {job.sid}") + + time_to_execute = 0 + + # Check if the job is finished + while not job.is_done(): + self.logger.info(f"Job {job.sid} is still running...") + time.sleep(1) + time_to_execute += 1 + + self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") else: self.logger.warning(f"Detection '{self.name}' was already enabled") @@ -946,24 +970,52 @@ def test( self.update_pbar(TestingStates.FORCE_RUN) self.force_run() - # loop so long as the elapsed time is less than max_sleep - while elapsed_sleep_time < max_sleep: - # sleep so the detection job can finish - self.logger.info( - f"Waiting {time_to_sleep} for {self.name} so it can finish" - ) - self.update_pbar(TestingStates.VALIDATING) - time.sleep(time_to_sleep) - elapsed_sleep_time += time_to_sleep + # # loop so long as the elapsed time is less than max_sleep + # while elapsed_sleep_time < max_sleep: + # # sleep so the detection job can finish + # self.logger.info( + # f"Waiting {time_to_sleep} for {self.name} so it can finish" + # ) + # self.update_pbar(TestingStates.VALIDATING) + # # time.sleep(time_to_sleep) + # self.logger.info( + # f"Skipping sleeping time for testing purposes" + # ) + # elapsed_sleep_time += time_to_sleep + + # self.logger.info( + # f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of " + # f"{max_sleep} max)" + # ) + + # # reset the result to None on each loop iteration + # result = None + + max_retries = 10 + initial_wait = 2 + max_wait = 60 + max_total_wait = 300 + + current_turn = 1 + wait_time = initial_wait + total_waited = 0 + + while current_turn <= max_retries and total_waited < max_total_wait: + current_turn += 1 self.logger.info( - f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of " - f"{max_sleep} max)" + f"Skipping sleeping time for testing purposes" ) + if current_turn > 3: + time.sleep(wait_time) + total_waited += wait_time + self.logger.info(f"Waiting {wait_time}s before retry {current_turn}...") + + wait_time = min(wait_time * 2, max_wait) + # reset the result to None on each loop iteration result = None - try: # Validate risk events if self.has_risk_analysis_action: @@ -1023,15 +1075,15 @@ def test( ) break - # increment number of attempts to validate detection - num_tries += 1 + # # increment number of attempts to validate detection + # num_tries += 1 - # compute the next time to sleep for - time_to_sleep = 2**num_tries + # # compute the next time to sleep for + # time_to_sleep = 2**num_tries - # if the computed time to sleep will exceed max_sleep, adjust appropriately - if (elapsed_sleep_time + time_to_sleep) > max_sleep: - time_to_sleep = max_sleep - elapsed_sleep_time + # # if the computed time to sleep will exceed max_sleep, adjust appropriately + # if (elapsed_sleep_time + time_to_sleep) > max_sleep: + # time_to_sleep = max_sleep - elapsed_sleep_time # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result From af111b78e86c38c449c9db03f0e1548dd0ab4643 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Fri, 13 Jun 2025 13:16:19 -0700 Subject: [PATCH 02/18] Add rerun for savedsearch --- contentctl/objects/correlation_search.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 6e6c790b..400fcd15 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -36,7 +36,7 @@ # Suppress logging by default; enable for local testing ENABLE_LOGGING = True LOG_LEVEL = logging.DEBUG -LOG_PATH = "correlation_search.log" +LOG_PATH = "correlation_search_test2.log" class SavedSearchKeys(StrEnum): @@ -1013,6 +1013,20 @@ def test( self.logger.info(f"Waiting {wait_time}s before retry {current_turn}...") wait_time = min(wait_time * 2, max_wait) + + # Rerun the search job + job = self.dispatch() + self.logger.info(f"Force running detection '{self.name}' with job ID: {job.sid}") + + time_to_execute = 0 + + # Check if the job is finished + while not job.is_done(): + self.logger.info(f"Job {job.sid} is still running...") + time.sleep(1) + time_to_execute += 1 + + self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") # reset the result to None on each loop iteration result = None From 22def48124cdb1d1ed749a505ea48cd53ce91d32 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Wed, 18 Jun 2025 14:33:37 -0700 Subject: [PATCH 03/18] Update retry logic --- contentctl/objects/correlation_search.py | 195 ++++++++++------------- 1 file changed, 84 insertions(+), 111 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 400fcd15..eb15eedb 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -36,7 +36,7 @@ # Suppress logging by default; enable for local testing ENABLE_LOGGING = True LOG_LEVEL = logging.DEBUG -LOG_PATH = "correlation_search_test2.log" +LOG_PATH = "correlation_search.log" class SavedSearchKeys(StrEnum): @@ -904,6 +904,58 @@ def notable_in_risk_dm(self) -> bool: if isinstance(event, NotableEvent): return True return False + + def validate_risk_notable_events(self) -> IntegrationTestResult | None: + try: + # Validate risk events + if self.has_risk_analysis_action: + self.logger.debug("Checking for matching risk events") + if self.risk_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + # -> I've observed there being a missing risk event (15/16) on + # the first few tries, so this does help us check for true + # positives; BUT, if we have lots of failing detections, this + # will definitely add to the total wait time + # -> certain types of failures (e.g. risk message, or any value + # checking) should fail testing automatically + # -> other types, like those based on counts of risk events, + # should happen should fail more slowly as more events may be + # produced + self.validate_risk_events() + else: + raise ValidationFailed( + f"TEST FAILED: No matching risk event created for: {self.name}" + ) + else: + self.logger.debug( + f"No risk action defined for '{self.name}'" + ) + + # Validate notable events + if self.has_notable_action: + self.logger.debug("Checking for matching notable events") + # NOTE: because we check this last, if both fail, the error message about notables will + # always be the last to be added and thus the one surfaced to the user + if self.notable_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + self.validate_notable_events() + pass + else: + raise ValidationFailed( + f"TEST FAILED: No matching notable event created for: {self.name}" + ) + else: + self.logger.debug( + f"No notable action defined for '{self.name}'" + ) + + return None + except ValidationFailed as e: + self.logger.error(f"Risk/notable validation failed: {e}") + return IntegrationTestResult( + status=TestResultStatus.FAIL, + message=f"TEST FAILED: {e}" + ) # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky @@ -934,10 +986,6 @@ def test( # keep track of time slept and number of attempts for exponential backoff (base 2) elapsed_sleep_time = 0 - num_tries = 0 - - # set the initial base sleep time - time_to_sleep = TimeoutConfig.BASE_SLEEP try: # first make sure the indexes are currently empty and the detection is starting from a disabled state @@ -970,115 +1018,19 @@ def test( self.update_pbar(TestingStates.FORCE_RUN) self.force_run() - # # loop so long as the elapsed time is less than max_sleep - # while elapsed_sleep_time < max_sleep: - # # sleep so the detection job can finish - # self.logger.info( - # f"Waiting {time_to_sleep} for {self.name} so it can finish" - # ) - # self.update_pbar(TestingStates.VALIDATING) - # # time.sleep(time_to_sleep) - # self.logger.info( - # f"Skipping sleeping time for testing purposes" - # ) - # elapsed_sleep_time += time_to_sleep - - # self.logger.info( - # f"Validating detection (attempt #{num_tries + 1} - {elapsed_sleep_time} seconds elapsed of " - # f"{max_sleep} max)" - # ) - - # # reset the result to None on each loop iteration - # result = None - - max_retries = 10 - initial_wait = 2 - max_wait = 60 - max_total_wait = 300 + max_retries = 3 current_turn = 1 - wait_time = initial_wait - total_waited = 0 - while current_turn <= max_retries and total_waited < max_total_wait: + while current_turn <= max_retries: current_turn += 1 self.logger.info( f"Skipping sleeping time for testing purposes" ) - if current_turn > 3: - time.sleep(wait_time) - total_waited += wait_time - self.logger.info(f"Waiting {wait_time}s before retry {current_turn}...") - - wait_time = min(wait_time * 2, max_wait) - - # Rerun the search job - job = self.dispatch() - self.logger.info(f"Force running detection '{self.name}' with job ID: {job.sid}") - - time_to_execute = 0 - - # Check if the job is finished - while not job.is_done(): - self.logger.info(f"Job {job.sid} is still running...") - time.sleep(1) - time_to_execute += 1 - - self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") - # reset the result to None on each loop iteration - result = None - try: - # Validate risk events - if self.has_risk_analysis_action: - self.logger.debug("Checking for matching risk events") - if self.risk_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - # -> I've observed there being a missing risk event (15/16) on - # the first few tries, so this does help us check for true - # positives; BUT, if we have lots of failing detections, this - # will definitely add to the total wait time - # -> certain types of failures (e.g. risk message, or any value - # checking) should fail testing automatically - # -> other types, like those based on counts of risk events, - # should happen should fail more slowly as more events may be - # produced - self.validate_risk_events() - else: - raise ValidationFailed( - f"TEST FAILED: No matching risk event created for: {self.name}" - ) - else: - self.logger.debug( - f"No risk action defined for '{self.name}'" - ) - - # Validate notable events - if self.has_notable_action: - self.logger.debug("Checking for matching notable events") - # NOTE: because we check this last, if both fail, the error message about notables will - # always be the last to be added and thus the one surfaced to the user - if self.notable_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - self.validate_notable_events() - pass - else: - raise ValidationFailed( - f"TEST FAILED: No matching notable event created for: {self.name}" - ) - else: - self.logger.debug( - f"No notable action defined for '{self.name}'" - ) - except ValidationFailed as e: - self.logger.error(f"Risk/notable validation failed: {e}") - result = IntegrationTestResult( - status=TestResultStatus.FAIL, - message=f"TEST FAILED: {e}", - wait_duration=elapsed_sleep_time, - ) + result = self.validate_risk_notable_events() # if result is still None, then all checks passed and we can break the loop if result is None: @@ -1088,16 +1040,37 @@ def test( wait_duration=elapsed_sleep_time, ) break + + if result != None and result.status == TestResultStatus.FAIL: + elapsed = 0 - # # increment number of attempts to validate detection - # num_tries += 1 + for i in range(10): + time.sleep(10) + start_time = time.time() - # # compute the next time to sleep for - # time_to_sleep = 2**num_tries + job = self.dispatch() + time_to_execute = 0 + # Check if the job is finished + while not job.is_done(): + self.logger.info(f"Job {job.sid} is still running...") + time.sleep(1) + time_to_execute += 1 + self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") - # # if the computed time to sleep will exceed max_sleep, adjust appropriately - # if (elapsed_sleep_time + time_to_sleep) > max_sleep: - # time_to_sleep = max_sleep - elapsed_sleep_time + result = self.validate_risk_notable_events() + end_time = time.time() + + elapsed = elapsed + end_time - start_time + 10 + + if result is None: + result = IntegrationTestResult( + status=TestResultStatus.PASS, + message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}" + ) + self.logger.info( + f"Test passed in {i}th retry after {elapsed} seconds" + ) + break # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result From 48be3aedb6fda4c9c4fcc21b441b4bb86fd02de0 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Wed, 18 Jun 2025 16:10:03 -0700 Subject: [PATCH 04/18] Add debug logging --- contentctl/objects/correlation_search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index eb15eedb..4ecdf50d 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -1068,7 +1068,7 @@ def test( message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}" ) self.logger.info( - f"Test passed in {i}th retry after {elapsed} seconds" + f"Test passed in {i}th retry after {elapsed} seconds for: {self.name}" ) break From c0a3bea600745a3203b7483bfdcb02a160507081 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Fri, 20 Jun 2025 09:16:15 -0700 Subject: [PATCH 05/18] Fix lint error --- contentctl/objects/correlation_search.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 4ecdf50d..40d67f0f 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -1025,10 +1025,6 @@ def test( while current_turn <= max_retries: current_turn += 1 - self.logger.info( - f"Skipping sleeping time for testing purposes" - ) - # reset the result to None on each loop iteration result = self.validate_risk_notable_events() @@ -1041,7 +1037,7 @@ def test( ) break - if result != None and result.status == TestResultStatus.FAIL: + if result is not None and result.status == TestResultStatus.FAIL: elapsed = 0 for i in range(10): From dd74f913bd5e95e38c96a81f196ee4f7c9884733 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Fri, 20 Jun 2025 09:30:22 -0700 Subject: [PATCH 06/18] Fix format error --- contentctl/objects/correlation_search.py | 33 +++++++++++++----------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 40d67f0f..7038c12a 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -447,7 +447,9 @@ def dispatch(self) -> splunklib.Job: try: return self.saved_search.dispatch(trigger_actions=True) # type: ignore except HTTPError as e: - raise ServerError(f"HTTP error encountered while dispatching detection: {e}") + raise ServerError( + f"HTTP error encountered while dispatching detection: {e}" + ) def disable(self, refresh: bool = True) -> None: """Disables the SavedSearch @@ -509,7 +511,9 @@ def force_run(self, refresh: bool = True) -> None: if not self.enabled: self.enable(refresh=False) job = self.dispatch() - self.logger.info(f"Force running detection '{self.name}' with job ID: {job.sid}") + self.logger.info( + f"Force running detection '{self.name}' with job ID: {job.sid}" + ) time_to_execute = 0 @@ -519,7 +523,9 @@ def force_run(self, refresh: bool = True) -> None: time.sleep(1) time_to_execute += 1 - self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") + self.logger.info( + f"Job {job.sid} has finished running in {time_to_execute} seconds." + ) else: self.logger.warning(f"Detection '{self.name}' was already enabled") @@ -904,7 +910,7 @@ def notable_in_risk_dm(self) -> bool: if isinstance(event, NotableEvent): return True return False - + def validate_risk_notable_events(self) -> IntegrationTestResult | None: try: # Validate risk events @@ -927,9 +933,7 @@ def validate_risk_notable_events(self) -> IntegrationTestResult | None: f"TEST FAILED: No matching risk event created for: {self.name}" ) else: - self.logger.debug( - f"No risk action defined for '{self.name}'" - ) + self.logger.debug(f"No risk action defined for '{self.name}'") # Validate notable events if self.has_notable_action: @@ -945,16 +949,13 @@ def validate_risk_notable_events(self) -> IntegrationTestResult | None: f"TEST FAILED: No matching notable event created for: {self.name}" ) else: - self.logger.debug( - f"No notable action defined for '{self.name}'" - ) + self.logger.debug(f"No notable action defined for '{self.name}'") return None except ValidationFailed as e: self.logger.error(f"Risk/notable validation failed: {e}") return IntegrationTestResult( - status=TestResultStatus.FAIL, - message=f"TEST FAILED: {e}" + status=TestResultStatus.FAIL, message=f"TEST FAILED: {e}" ) # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls @@ -1036,7 +1037,7 @@ def test( wait_duration=elapsed_sleep_time, ) break - + if result is not None and result.status == TestResultStatus.FAIL: elapsed = 0 @@ -1051,7 +1052,9 @@ def test( self.logger.info(f"Job {job.sid} is still running...") time.sleep(1) time_to_execute += 1 - self.logger.info(f"Job {job.sid} has finished running in {time_to_execute} seconds.") + self.logger.info( + f"Job {job.sid} has finished running in {time_to_execute} seconds." + ) result = self.validate_risk_notable_events() end_time = time.time() @@ -1061,7 +1064,7 @@ def test( if result is None: result = IntegrationTestResult( status=TestResultStatus.PASS, - message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}" + message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}", ) self.logger.info( f"Test passed in {i}th retry after {elapsed} seconds for: {self.name}" From 9b09545df502be1b7263547728f32a5c02c3ae01 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Fri, 20 Jun 2025 11:16:54 -0700 Subject: [PATCH 07/18] Update rerun logic based on wait time data --- contentctl/objects/correlation_search.py | 74 ++++++++---------------- 1 file changed, 25 insertions(+), 49 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 7038c12a..77c4366d 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -445,7 +445,19 @@ def dispatch(self) -> splunklib.Job: """ self.logger.debug(f"Dispatching {self.name}...") try: - return self.saved_search.dispatch(trigger_actions=True) # type: ignore + job = self.saved_search.dispatch(trigger_actions=True) + + time_to_execute = 0 + # Check if the job is finished + while not job.is_done(): + self.logger.info(f"Job {job.sid} is still running...") + time.sleep(1) + time_to_execute += 1 + self.logger.info( + f"Job {job.sid} has finished running in {time_to_execute} seconds." + ) + + return job # type: ignore except HTTPError as e: raise ServerError( f"HTTP error encountered while dispatching detection: {e}" @@ -512,19 +524,7 @@ def force_run(self, refresh: bool = True) -> None: self.enable(refresh=False) job = self.dispatch() self.logger.info( - f"Force running detection '{self.name}' with job ID: {job.sid}" - ) - - time_to_execute = 0 - - # Check if the job is finished - while not job.is_done(): - self.logger.info(f"Job {job.sid} is still running...") - time.sleep(1) - time_to_execute += 1 - - self.logger.info( - f"Job {job.sid} has finished running in {time_to_execute} seconds." + f"Finished running detection '{self.name}' with job ID: {job.sid}" ) else: self.logger.warning(f"Detection '{self.name}' was already enabled") @@ -1019,12 +1019,17 @@ def test( self.update_pbar(TestingStates.FORCE_RUN) self.force_run() - max_retries = 3 + max_total_wait = 150 + wait_time = 2 + max_wait = 30 + time_elapsed = 0 - current_turn = 1 + while time_elapsed <= max_total_wait: + if time_elapsed > 90: + self.dispatch() - while current_turn <= max_retries: - current_turn += 1 + start_time = time.time() + time.sleep(wait_time) # reset the result to None on each loop iteration result = self.validate_risk_notable_events() @@ -1038,38 +1043,9 @@ def test( ) break - if result is not None and result.status == TestResultStatus.FAIL: - elapsed = 0 - - for i in range(10): - time.sleep(10) - start_time = time.time() - - job = self.dispatch() - time_to_execute = 0 - # Check if the job is finished - while not job.is_done(): - self.logger.info(f"Job {job.sid} is still running...") - time.sleep(1) - time_to_execute += 1 - self.logger.info( - f"Job {job.sid} has finished running in {time_to_execute} seconds." - ) - - result = self.validate_risk_notable_events() end_time = time.time() - - elapsed = elapsed + end_time - start_time + 10 - - if result is None: - result = IntegrationTestResult( - status=TestResultStatus.PASS, - message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}", - ) - self.logger.info( - f"Test passed in {i}th retry after {elapsed} seconds for: {self.name}" - ) - break + time_elapsed += end_time - start_time + wait_time = min(max_wait, wait_time * 2) # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result From b8e6c126f5219457f5ceb788ac70bd41152abefc Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 23 Jun 2025 08:47:46 -0700 Subject: [PATCH 08/18] Add some comments --- contentctl/objects/correlation_search.py | 74 +++++++++++++++++------- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 77c4366d..10ae0ba1 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -69,14 +69,30 @@ class TimeoutConfig(IntEnum): """ # base amount to sleep for before beginning exponential backoff during testing - BASE_SLEEP = 60 + BASE_SLEEP = 2 # NOTE: Some detections take longer to generate their risk/notables than other; testing has - # shown 270s to likely be sufficient for all detections in 99% of runs; however we have - # encountered a handful of transient failures in the last few months. Since our success rate - # is at 100% now, we will round this to a flat 300s to accomodate these outliers. + # shown 30s to likely be sufficient for all detections in 99% of runs and less than 1% of detections + # would need 60s and 90s to wait for risk/notables; therefore 30s is a reasonable interval for max + # wait time. # Max amount to wait before timing out during exponential backoff - MAX_SLEEP = 300 + MAX_SLEEP = 30 + + # NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the remaining 1% of + # detections may need up to 150s to finish; so this is a reasonable total maximum wait time + # Total wait time before giving up on waiting for risk/notables to be generated + TOTAL_MAX_WAIT = 180 + + # NOTE: Based on testing, there is 1% detections couldn't generate risk/notables within single dispatch, and + # they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch + # Wait time before retrying dispatching the SavedSearch + RETRY_DISPATCH = 90 + + # NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the validation of risks + # and notables would take around 5 to 10 seconds; so before adding additional wait time, we let the validation + # process work as the default wait time until we reach the ADD_WAIT_TIME and add additional wait time + # Time elased before adding additional wait time + ADD_WAIT_TIME = 30 # TODO (#226): evaluate sane defaults for timeframe for integration testing (e.g. 5y is good @@ -441,7 +457,7 @@ def dispatch(self) -> splunklib.Job: """Dispatches the SavedSearch Dispatches the SavedSearch entity, returning a Job object representing the search job. - :return: a splunklib.Job object representing the search job + :return: a splunklib.Job object representing the search job when the SavedSearch is finished running """ self.logger.debug(f"Dispatching {self.name}...") try: @@ -911,7 +927,15 @@ def notable_in_risk_dm(self) -> bool: return True return False - def validate_risk_notable_events(self) -> IntegrationTestResult | None: + def validate_risk_notable_events(self) -> tuple[bool, str | None]: + """Validates the existence of risk and notable events + + Returns a bool indicating whether validating risks and notables is successful or not, + and a message indicating the reason for failure (if any). + + :return: True if validation passes, False if it fails; None if no message, or a string + message indicating the reason for failure + """ try: # Validate risk events if self.has_risk_analysis_action: @@ -951,12 +975,10 @@ def validate_risk_notable_events(self) -> IntegrationTestResult | None: else: self.logger.debug(f"No notable action defined for '{self.name}'") - return None + return True, None except ValidationFailed as e: self.logger.error(f"Risk/notable validation failed: {e}") - return IntegrationTestResult( - status=TestResultStatus.FAIL, message=f"TEST FAILED: {e}" - ) + return False, str(e) # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky @@ -1019,33 +1041,45 @@ def test( self.update_pbar(TestingStates.FORCE_RUN) self.force_run() - max_total_wait = 150 - wait_time = 2 - max_wait = 30 + max_total_wait = TimeoutConfig.TOTAL_MAX_WAIT + wait_time = TimeoutConfig.BASE_SLEEP + max_wait = TimeoutConfig.MAX_SLEEP time_elapsed = 0 while time_elapsed <= max_total_wait: - if time_elapsed > 90: + # wait at least 90 seconds to rerun the SavedSearch + if time_elapsed > TimeoutConfig.RETRY_DISPATCH: self.dispatch() start_time = time.time() - time.sleep(wait_time) + + # wait at least 30 seconds before adding to the wait time + if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: + time.sleep(wait_time) + elapsed_sleep_time += wait_time + wait_time = min(max_wait, wait_time * 2) # reset the result to None on each loop iteration - result = self.validate_risk_notable_events() + result = None + + validate_pass, error = self.validate_risk_notable_events() - # if result is still None, then all checks passed and we can break the loop - if result is None: + # if result is True, then all checks passed and we can break the loop + if validate_pass: result = IntegrationTestResult( status=TestResultStatus.PASS, message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}", wait_duration=elapsed_sleep_time, ) break + else: + result = IntegrationTestResult( + status=TestResultStatus.FAIL, + message=f"TEST FAILED: {error}", + ) end_time = time.time() time_elapsed += end_time - start_time - wait_time = min(max_wait, wait_time * 2) # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result From 1349ff0c1ccae6b54aa562e48d14636ef4625e1e Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 23 Jun 2025 11:22:08 -0700 Subject: [PATCH 09/18] Refactor risk/notable query to dispatch search sid --- contentctl/objects/correlation_search.py | 31 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 10ae0ba1..8b7999c6 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -218,6 +218,9 @@ class CorrelationSearch(BaseModel): # cleanup of this index test_index: str | None = Field(default=None, min_length=1) + # The search ID of the last dispatched search; this is used to query for risk/notable events + sid: str | None = Field(default=None) + # The logger to use (logs all go to a null pipe unless ENABLE_LOGGING is set to True, so as not # to conflict w/ tqdm) logger: logging.Logger = Field( @@ -473,6 +476,8 @@ def dispatch(self) -> splunklib.Job: f"Job {job.sid} has finished running in {time_to_execute} seconds." ) + self.sid = job.sid + return job # type: ignore except HTTPError as e: raise ServerError( @@ -581,10 +586,15 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID # Search for all risk events from a single scheduled search (indicated by orig_sid) - query = ( - f'search index=risk search_name="{self.name}" [search index=risk search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) + if self.sid is None: + # query for validating detection is starting from a disabled state + query = ( + f'search index=risk search_name="{self.name}" [search index=risk search ' + f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' + ) + else: + # query after the detection has been enabled and dispatched + query = f'search index=risk search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -657,10 +667,15 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: return self._notable_events # Search for all notable events from a single scheduled search (indicated by orig_sid) - query = ( - f'search index=notable search_name="{self.name}" [search index=notable search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) + if self.sid is None: + # query for validating detection is starting from a disabled state + query = ( + f'search index=notable search_name="{self.name}" [search index=notable search ' + f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' + ) + else: + # query after the detection has been enabled and dispatched + query = f'search index=notable search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors From 3bcb7488e0ced4877117951e2f95d28408490b54 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 23 Jun 2025 13:06:26 -0700 Subject: [PATCH 10/18] Set logging back to default --- contentctl/objects/correlation_search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 8b7999c6..242e9c36 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -34,7 +34,7 @@ from contentctl.objects.risk_event import RiskEvent # Suppress logging by default; enable for local testing -ENABLE_LOGGING = True +ENABLE_LOGGING = False LOG_LEVEL = logging.DEBUG LOG_PATH = "correlation_search.log" From 150c2096d45d7156d270642ebb376364df1f8653 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 24 Jun 2025 13:58:27 -0700 Subject: [PATCH 11/18] Refractor retry block structure --- contentctl/objects/correlation_search.py | 207 +++++++++++------------ 1 file changed, 100 insertions(+), 107 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 242e9c36..1ac3f63a 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -78,11 +78,6 @@ class TimeoutConfig(IntEnum): # Max amount to wait before timing out during exponential backoff MAX_SLEEP = 30 - # NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the remaining 1% of - # detections may need up to 150s to finish; so this is a reasonable total maximum wait time - # Total wait time before giving up on waiting for risk/notables to be generated - TOTAL_MAX_WAIT = 180 - # NOTE: Based on testing, there is 1% detections couldn't generate risk/notables within single dispatch, and # they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch # Wait time before retrying dispatching the SavedSearch @@ -533,26 +528,6 @@ def update_timeframe( if refresh: self.refresh() - def force_run(self, refresh: bool = True) -> None: - """Forces a detection run - - Enables the detection, adjusts the cron schedule to run every 1 minute, and widens the earliest/latest window - to run on test data. - :param refresh: a bool indicating whether to refresh the metadata for the detection (default True) - """ - self.update_timeframe(refresh=False) - if not self.enabled: - self.enable(refresh=False) - job = self.dispatch() - self.logger.info( - f"Finished running detection '{self.name}' with job ID: {job.sid}" - ) - else: - self.logger.warning(f"Detection '{self.name}' was already enabled") - - if refresh: - self.refresh() - def risk_event_exists(self) -> bool: """Whether at least one matching risk event exists @@ -942,59 +917,93 @@ def notable_in_risk_dm(self) -> bool: return True return False - def validate_risk_notable_events(self) -> tuple[bool, str | None]: - """Validates the existence of risk and notable events - - Returns a bool indicating whether validating risks and notables is successful or not, - and a message indicating the reason for failure (if any). - - :return: True if validation passes, False if it fails; None if no message, or a string - message indicating the reason for failure + def validate_risk_notable_events(self) -> None: """ - try: - # Validate risk events - if self.has_risk_analysis_action: - self.logger.debug("Checking for matching risk events") - if self.risk_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - # -> I've observed there being a missing risk event (15/16) on - # the first few tries, so this does help us check for true - # positives; BUT, if we have lots of failing detections, this - # will definitely add to the total wait time - # -> certain types of failures (e.g. risk message, or any value - # checking) should fail testing automatically - # -> other types, like those based on counts of risk events, - # should happen should fail more slowly as more events may be - # produced - self.validate_risk_events() - else: - raise ValidationFailed( - f"TEST FAILED: No matching risk event created for: {self.name}" - ) + Validate the risk and notable events created by the saved search + """ + # Validate risk events + if self.has_risk_analysis_action: + self.logger.debug("Checking for matching risk events") + if self.risk_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + # -> I've observed there being a missing risk event (15/16) on + # the first few tries, so this does help us check for true + # positives; BUT, if we have lots of failing detections, this + # will definitely add to the total wait time + # -> certain types of failures (e.g. risk message, or any value + # checking) should fail testing automatically + # -> other types, like those based on counts of risk events, + # should happen should fail more slowly as more events may be + # produced + self.validate_risk_events() else: - self.logger.debug(f"No risk action defined for '{self.name}'") - - # Validate notable events - if self.has_notable_action: - self.logger.debug("Checking for matching notable events") - # NOTE: because we check this last, if both fail, the error message about notables will - # always be the last to be added and thus the one surfaced to the user - if self.notable_event_exists(): - # TODO (PEX-435): should this in the retry loop? or outside it? - self.validate_notable_events() - pass - else: - raise ValidationFailed( - f"TEST FAILED: No matching notable event created for: {self.name}" - ) + raise ValidationFailed( + f"TEST FAILED: No matching risk event created for: {self.name}" + ) + else: + self.logger.debug(f"No risk action defined for '{self.name}'") + + # Validate notable events + if self.has_notable_action: + self.logger.debug("Checking for matching notable events") + # NOTE: because we check this last, if both fail, the error message about notables will + # always be the last to be added and thus the one surfaced to the user + if self.notable_event_exists(): + # TODO (PEX-435): should this in the retry loop? or outside it? + self.validate_notable_events() + pass else: - self.logger.debug(f"No notable action defined for '{self.name}'") + raise ValidationFailed( + f"TEST FAILED: No matching notable event created for: {self.name}" + ) + else: + self.logger.debug(f"No notable action defined for '{self.name}'") + + def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: + """Dispatch the saved search and validate the risk/notable events + + Dispatches the saved search and validates the risk/notable events created by it. If any + validation fails, raises a ValidationFailed exception. + """ + self.dispatch() + + wait_time = TimeoutConfig.BASE_SLEEP + max_wait = TimeoutConfig.MAX_SLEEP + time_elapsed = 0 + validation_failed = False + + while time_elapsed <= TimeoutConfig.RETRY_DISPATCH: + start_time = time.time() + + # reset validation_failed for each iteration + validation_failed = False - return True, None - except ValidationFailed as e: - self.logger.error(f"Risk/notable validation failed: {e}") - return False, str(e) + # wait at least 30 seconds before adding to the wait time + if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: + time.sleep(wait_time) + elapsed_sleep_time["elapsed_sleep_time"] += wait_time + wait_time = min(max_wait, wait_time * 2) + try: + self.validate_risk_notable_events() + except ValidationFailed as e: + self.logger.error(f"Validation failed: {e}") + validation_failed = True + + end_time = time.time() + time_elapsed += end_time - start_time + + if not validation_failed: + self.logger.info( + f"Validation passed for {self.name} after {elapsed_sleep_time['elapsed_sleep_time']} seconds" + ) + break + + if validation_failed: + raise ValidationFailed( + f"TEST FAILED: No matching notable event created for: {self.name}" + ) + # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky def test( @@ -1023,7 +1032,7 @@ def test( result: IntegrationTestResult | None = None # keep track of time slept and number of attempts for exponential backoff (base 2) - elapsed_sleep_time = 0 + elapsed_sleep_time = {'elapsed_sleep_time': 0} try: # first make sure the indexes are currently empty and the detection is starting from a disabled state @@ -1054,47 +1063,31 @@ def test( # force the detection to run self.logger.info(f"Forcing a run on {self.name}") self.update_pbar(TestingStates.FORCE_RUN) - self.force_run() - - max_total_wait = TimeoutConfig.TOTAL_MAX_WAIT - wait_time = TimeoutConfig.BASE_SLEEP - max_wait = TimeoutConfig.MAX_SLEEP - time_elapsed = 0 - - while time_elapsed <= max_total_wait: - # wait at least 90 seconds to rerun the SavedSearch - if time_elapsed > TimeoutConfig.RETRY_DISPATCH: - self.dispatch() - - start_time = time.time() - - # wait at least 30 seconds before adding to the wait time - if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: - time.sleep(wait_time) - elapsed_sleep_time += wait_time - wait_time = min(max_wait, wait_time * 2) + self.update_timeframe(refresh=False) + self.enable(refresh=False) + attempt = 1 + while attempt <= 3: # reset the result to None on each loop iteration result = None - validate_pass, error = self.validate_risk_notable_events() - - # if result is True, then all checks passed and we can break the loop - if validate_pass: + attempt += 1 + try: + self.dispatch_and_validate(elapsed_sleep_time) + except ValidationFailed as e: + self.logger.error(f"Risk/notable validation failed: {e}") + result = IntegrationTestResult( + status=TestResultStatus.FAIL, + message=f"TEST FAILED: {e}", + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], + ) + if result is None: result = IntegrationTestResult( status=TestResultStatus.PASS, message=f"TEST PASSED: Expected risk and/or notable events were created for: {self.name}", - wait_duration=elapsed_sleep_time, + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], ) break - else: - result = IntegrationTestResult( - status=TestResultStatus.FAIL, - message=f"TEST FAILED: {error}", - ) - - end_time = time.time() - time_elapsed += end_time - start_time # TODO (PEX-436): should cleanup be in a finally block so it runs even on exception? # cleanup the created events, disable the detection and return the result @@ -1106,7 +1099,7 @@ def test( result = IntegrationTestResult( status=TestResultStatus.ERROR, message=f"TEST FAILED (ERROR): Exception raised during integration test: {e}", - wait_duration=elapsed_sleep_time, + wait_duration=elapsed_sleep_time["elapsed_sleep_time"], exception=e, ) self.logger.exception(result.message) # type: ignore From bf024ccdbc5b0b42f8ed92175d298aae6d625c5b Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 24 Jun 2025 14:09:53 -0700 Subject: [PATCH 12/18] Update comment and variable names to be more descriptive --- contentctl/objects/correlation_search.py | 54 ++++++++++++------------ 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 1ac3f63a..eef4d991 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -68,25 +68,22 @@ class TimeoutConfig(IntEnum): Configuration values for the exponential backoff timer """ + # NOTE: Some detections take longer to generate their risk/notables than other; testing has + # shown that in a single run, 99% detections could generate risk/notables within 30s and less than 1% + # detections (20 to 30 detections) would need 60 to 90s to wait for risk/notables. + # base amount to sleep for before beginning exponential backoff during testing BASE_SLEEP = 2 - # NOTE: Some detections take longer to generate their risk/notables than other; testing has - # shown 30s to likely be sufficient for all detections in 99% of runs and less than 1% of detections - # would need 60s and 90s to wait for risk/notables; therefore 30s is a reasonable interval for max - # wait time. - # Max amount to wait before timing out during exponential backoff - MAX_SLEEP = 30 + # Max amount to wait before timing out during exponential backoff in each iteration + MAX_SLEEP_PER_TRY = 30 - # NOTE: Based on testing, there is 1% detections couldn't generate risk/notables within single dispatch, and + # NOTE: Based on testing, there are 45 detections couldn't generate risk/notables within single dispatch, and # they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch # Wait time before retrying dispatching the SavedSearch RETRY_DISPATCH = 90 - # NOTE: Based on testing, 99% of detections will generate risk/notables within 30s, and the validation of risks - # and notables would take around 5 to 10 seconds; so before adding additional wait time, we let the validation - # process work as the default wait time until we reach the ADD_WAIT_TIME and add additional wait time - # Time elased before adding additional wait time + # Time elapsed before adding additional wait time ADD_WAIT_TIME = 30 @@ -464,10 +461,10 @@ def dispatch(self) -> splunklib.Job: time_to_execute = 0 # Check if the job is finished while not job.is_done(): - self.logger.info(f"Job {job.sid} is still running...") + self.logger.debug(f"Job {job.sid} is still running...") time.sleep(1) time_to_execute += 1 - self.logger.info( + self.logger.debug( f"Job {job.sid} has finished running in {time_to_execute} seconds." ) @@ -917,7 +914,7 @@ def notable_in_risk_dm(self) -> bool: return True return False - def validate_risk_notable_events(self) -> None: + def validate_ara_events(self) -> None: """ Validate the risk and notable events created by the saved search """ @@ -968,12 +965,12 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: self.dispatch() wait_time = TimeoutConfig.BASE_SLEEP - max_wait = TimeoutConfig.MAX_SLEEP + max_wait = TimeoutConfig.MAX_SLEEP_PER_TRY time_elapsed = 0 validation_failed = False while time_elapsed <= TimeoutConfig.RETRY_DISPATCH: - start_time = time.time() + validation_start_time = time.time() # reset validation_failed for each iteration validation_failed = False @@ -985,29 +982,31 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: wait_time = min(max_wait, wait_time * 2) try: - self.validate_risk_notable_events() + self.validate_ara_events() except ValidationFailed as e: self.logger.error(f"Validation failed: {e}") validation_failed = True - - end_time = time.time() - time_elapsed += end_time - start_time - + # break out of the loop if validation passes if not validation_failed: self.logger.info( f"Validation passed for {self.name} after {elapsed_sleep_time['elapsed_sleep_time']} seconds" ) break + validation_end_time = time.time() + time_elapsed += validation_end_time - validation_start_time + if validation_failed: raise ValidationFailed( f"TEST FAILED: No matching notable event created for: {self.name}" ) - + # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky def test( - self, max_sleep: int = TimeoutConfig.MAX_SLEEP, raise_on_exc: bool = False + self, + max_sleep: int = TimeoutConfig.MAX_SLEEP_PER_TRY, + raise_on_exc: bool = False, ) -> IntegrationTestResult: """Execute the integration test @@ -1015,13 +1014,12 @@ def test( and clear the indexes if so. Then, we force a run of the detection, wait for `sleep` seconds, and finally we validate that the appropriate risk/notable events seem to have been created. NOTE: assumes the data already exists in the instance - :param max_sleep: max number of seconds to sleep for after enabling the detection before we check for created - events; re-checks are made upon failures using an exponential backoff until the max is reached + :param max_sleep: max number of seconds to sleep in each iteration for after enabling the detection before we + check for created events; re-checks are made upon failures using an exponential backoff until the max is reached :param raise_on_exc: bool flag indicating if an exception should be raised when caught by the test routine, or if the error state should just be recorded for the test """ - # max_sleep must be greater than the base value we must wait for the scheduled searchjob to run (jobs run every - # 60s) + # max_sleep must be greater than the base value if max_sleep < TimeoutConfig.BASE_SLEEP: raise ClientError( f"max_sleep value of {max_sleep} is less than the base sleep required " @@ -1032,7 +1030,7 @@ def test( result: IntegrationTestResult | None = None # keep track of time slept and number of attempts for exponential backoff (base 2) - elapsed_sleep_time = {'elapsed_sleep_time': 0} + elapsed_sleep_time = {"elapsed_sleep_time": 0} try: # first make sure the indexes are currently empty and the detection is starting from a disabled state From 842125c2780a24b4671f8f5031645059899ce69e Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 24 Jun 2025 14:23:46 -0700 Subject: [PATCH 13/18] Update search query for risk datamodel --- contentctl/objects/correlation_search.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index eef4d991..17f3b48d 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -556,8 +556,7 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: ) return self._risk_events - # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID - # Search for all risk events from a single scheduled search (indicated by orig_sid) + # Search for all risk events from a single search (indicated by orig_sid) if self.sid is None: # query for validating detection is starting from a disabled state query = ( @@ -638,7 +637,7 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: ) return self._notable_events - # Search for all notable events from a single scheduled search (indicated by orig_sid) + # Search for all notable events from a single search (indicated by orig_sid) if self.sid is None: # query for validating detection is starting from a disabled state query = ( @@ -721,14 +720,20 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve ) return self._risk_dm_events - # TODO (#248): Refactor risk/notable querying to pin to a single savedsearch ID # Search for all risk data model events from a single scheduled search (indicated by # orig_sid) - query = ( - f'datamodel Risk All_Risk flat | search search_name="{self.name}" [datamodel Risk ' - f'All_Risk flat | search search_name="{self.name}" | tail 1 | fields orig_sid] ' - "| tojson" - ) + if self.sid is None: + # query for validating detection is starting from a disabled state + query = ( + f'datamodel Risk All_Risk flat | search search_name="{self.name}" ' + f'[search datamodel Risk All_Risk flat | search search_name="{self.name}" ' + "| tail 1 | fields orig_sid] | tojson" + ) + else: + query = ( + f'datamodel Risk All_Risk flat | search search_name="{self.name}" orig_sid="{self.sid}" ' + "| tojson" + ) result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors From 1a8ff7eeebf5f7c953c14f7f2aa2482159b8efea Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 24 Jun 2025 14:28:03 -0700 Subject: [PATCH 14/18] Update comment --- contentctl/objects/correlation_search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 17f3b48d..7155b0ca 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -720,7 +720,7 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve ) return self._risk_dm_events - # Search for all risk data model events from a single scheduled search (indicated by + # Search for all risk data model events from a single search (indicated by # orig_sid) if self.sid is None: # query for validating detection is starting from a disabled state From 7a33f574413ddc1f15166f57ff8df1008e6c169c Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 26 Jun 2025 12:00:39 -0700 Subject: [PATCH 15/18] Update comment and error raising --- contentctl/objects/correlation_search.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 7155b0ca..4e9a112b 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -921,7 +921,8 @@ def notable_in_risk_dm(self) -> bool: def validate_ara_events(self) -> None: """ - Validate the risk and notable events created by the saved search + Validate the risk and notable events created by the saved search. + An exception is raised if the validation fails for either risk or notable events. """ # Validate risk events if self.has_risk_analysis_action: @@ -972,15 +973,15 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: wait_time = TimeoutConfig.BASE_SLEEP max_wait = TimeoutConfig.MAX_SLEEP_PER_TRY time_elapsed = 0 - validation_failed = False + validation_error = None while time_elapsed <= TimeoutConfig.RETRY_DISPATCH: validation_start_time = time.time() - # reset validation_failed for each iteration - validation_failed = False + # reset validation_error for each iteration + validation_error = None - # wait at least 30 seconds before adding to the wait time + # wait at least 30 seconds before adding to the wait time (we expect the vast majority of detections to show results w/in that window) if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: time.sleep(wait_time) elapsed_sleep_time["elapsed_sleep_time"] += wait_time @@ -990,9 +991,9 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: self.validate_ara_events() except ValidationFailed as e: self.logger.error(f"Validation failed: {e}") - validation_failed = True + validation_error = e # break out of the loop if validation passes - if not validation_failed: + if validation_error is None: self.logger.info( f"Validation passed for {self.name} after {elapsed_sleep_time['elapsed_sleep_time']} seconds" ) @@ -1001,10 +1002,8 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: validation_end_time = time.time() time_elapsed += validation_end_time - validation_start_time - if validation_failed: - raise ValidationFailed( - f"TEST FAILED: No matching notable event created for: {self.name}" - ) + if validation_error is not None: + raise validation_error # NOTE: it would be more ideal to switch this to a system which gets the handle of the saved search job and polls # it for completion, but that seems more tricky From 52089310b8b2bed1b6d0d5d6515328f825da9562 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 8 Jul 2025 15:14:29 -0700 Subject: [PATCH 16/18] Update docstring --- contentctl/objects/correlation_search.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 4e9a112b..b7998edd 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -923,6 +923,8 @@ def validate_ara_events(self) -> None: """ Validate the risk and notable events created by the saved search. An exception is raised if the validation fails for either risk or notable events. + + :raises ValidationFailed: If the expected risk events are not found or validation fails. """ # Validate risk events if self.has_risk_analysis_action: @@ -967,6 +969,11 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: Dispatches the saved search and validates the risk/notable events created by it. If any validation fails, raises a ValidationFailed exception. + + :param elapsed_sleep_time: Dictionary tracking the total elapsed sleep time across retries. + :type elapsed_sleep_time: dict[str, int] + + :raises ValidationFailed: If validation of risk/notable events fails after all retries. """ self.dispatch() From 4b413adac267f815c733acd953daa3ef0cdd8a20 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 8 Jul 2025 15:45:07 -0700 Subject: [PATCH 17/18] Remove cleanup --- contentctl/objects/correlation_search.py | 49 +++--------------------- 1 file changed, 6 insertions(+), 43 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index b7998edd..8665ee2a 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -557,15 +557,7 @@ def get_risk_events(self, force_update: bool = False) -> list[RiskEvent]: return self._risk_events # Search for all risk events from a single search (indicated by orig_sid) - if self.sid is None: - # query for validating detection is starting from a disabled state - query = ( - f'search index=risk search_name="{self.name}" [search index=risk search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) - else: - # query after the detection has been enabled and dispatched - query = f'search index=risk search_name="{self.name}" orig_sid="{self.sid}" | tojson' + query = f'search index=risk search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -638,15 +630,7 @@ def get_notable_events(self, force_update: bool = False) -> list[NotableEvent]: return self._notable_events # Search for all notable events from a single search (indicated by orig_sid) - if self.sid is None: - # query for validating detection is starting from a disabled state - query = ( - f'search index=notable search_name="{self.name}" [search index=notable search ' - f'search_name="{self.name}" | tail 1 | fields orig_sid] | tojson' - ) - else: - # query after the detection has been enabled and dispatched - query = f'search index=notable search_name="{self.name}" orig_sid="{self.sid}" | tojson' + query = f'search index=notable search_name="{self.name}" orig_sid="{self.sid}" | tojson' result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -722,18 +706,10 @@ def get_risk_dm_events(self, force_update: bool = False) -> list[BaseSecurityEve # Search for all risk data model events from a single search (indicated by # orig_sid) - if self.sid is None: - # query for validating detection is starting from a disabled state - query = ( - f'datamodel Risk All_Risk flat | search search_name="{self.name}" ' - f'[search datamodel Risk All_Risk flat | search search_name="{self.name}" ' - "| tail 1 | fields orig_sid] | tojson" - ) - else: - query = ( - f'datamodel Risk All_Risk flat | search search_name="{self.name}" orig_sid="{self.sid}" ' - "| tojson" - ) + query = ( + f'datamodel Risk All_Risk flat | search search_name="{self.name}" orig_sid="{self.sid}" ' + "| tojson" + ) result_iterator = self._search(query) # Iterate over the events, storing them in a list and checking for any errors @@ -1044,19 +1020,6 @@ def test( elapsed_sleep_time = {"elapsed_sleep_time": 0} try: - # first make sure the indexes are currently empty and the detection is starting from a disabled state - self.logger.debug("Cleaning up any pre-existing risk/notable events...") - self.update_pbar(TestingStates.PRE_CLEANUP) - if self.risk_event_exists(): - self.logger.warning( - f"Risk events matching '{self.name}' already exist; marking for deletion" - ) - if self.notable_event_exists(): - self.logger.warning( - f"Notable events matching '{self.name}' already exist; marking for deletion" - ) - self.cleanup() - # skip test if no risk or notable action defined if not self.has_risk_analysis_action and not self.has_notable_action: message = ( From 290a5f49b87c2b76dceb8c50c3811e78a01e5762 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Tue, 8 Jul 2025 15:57:02 -0700 Subject: [PATCH 18/18] Update wait_time assign logic --- contentctl/objects/correlation_search.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/contentctl/objects/correlation_search.py b/contentctl/objects/correlation_search.py index 8665ee2a..ec8cb609 100644 --- a/contentctl/objects/correlation_search.py +++ b/contentctl/objects/correlation_search.py @@ -75,9 +75,6 @@ class TimeoutConfig(IntEnum): # base amount to sleep for before beginning exponential backoff during testing BASE_SLEEP = 2 - # Max amount to wait before timing out during exponential backoff in each iteration - MAX_SLEEP_PER_TRY = 30 - # NOTE: Based on testing, there are 45 detections couldn't generate risk/notables within single dispatch, and # they needed to be retried; 90s is a reasonable wait time before retrying dispatching the SavedSearch # Wait time before retrying dispatching the SavedSearch @@ -954,7 +951,6 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: self.dispatch() wait_time = TimeoutConfig.BASE_SLEEP - max_wait = TimeoutConfig.MAX_SLEEP_PER_TRY time_elapsed = 0 validation_error = None @@ -968,7 +964,9 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: if time_elapsed > TimeoutConfig.ADD_WAIT_TIME: time.sleep(wait_time) elapsed_sleep_time["elapsed_sleep_time"] += wait_time - wait_time = min(max_wait, wait_time * 2) + wait_time = min( + TimeoutConfig.RETRY_DISPATCH - int(time_elapsed), wait_time * 2 + ) try: self.validate_ara_events() @@ -992,7 +990,6 @@ def dispatch_and_validate(self, elapsed_sleep_time: dict[str, int]) -> None: # it for completion, but that seems more tricky def test( self, - max_sleep: int = TimeoutConfig.MAX_SLEEP_PER_TRY, raise_on_exc: bool = False, ) -> IntegrationTestResult: """Execute the integration test @@ -1001,17 +998,9 @@ def test( and clear the indexes if so. Then, we force a run of the detection, wait for `sleep` seconds, and finally we validate that the appropriate risk/notable events seem to have been created. NOTE: assumes the data already exists in the instance - :param max_sleep: max number of seconds to sleep in each iteration for after enabling the detection before we - check for created events; re-checks are made upon failures using an exponential backoff until the max is reached :param raise_on_exc: bool flag indicating if an exception should be raised when caught by the test routine, or if the error state should just be recorded for the test """ - # max_sleep must be greater than the base value - if max_sleep < TimeoutConfig.BASE_SLEEP: - raise ClientError( - f"max_sleep value of {max_sleep} is less than the base sleep required " - f"({TimeoutConfig.BASE_SLEEP})" - ) # initialize result as None result: IntegrationTestResult | None = None