From ab92eb07f672655fbcc94e1031dd62ebec35b742 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Wed, 15 Oct 2025 15:26:41 -0700 Subject: [PATCH 1/9] Update search query to KVStore --- .../objects/content_versioning_service.py | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 68a529ba..449bee59 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -17,7 +17,7 @@ from contentctl.objects.detection import Detection # Suppress logging by default; enable for local testing -ENABLE_LOGGING = False +ENABLE_LOGGING = True LOG_LEVEL = logging.DEBUG LOG_PATH = "content_versioning_service.log" @@ -186,7 +186,7 @@ def cms_fields(self) -> list[str]: "app_name", "detection_id", "version", - "action.correlationsearch.label", + "content", "sourcetype", ] @@ -293,8 +293,8 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: # Construct the query looking for CMS events matching the content app name query = ( - f"search index=cms_main sourcetype=stash_common_detection_model " - f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}' + f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" + f"| fields {', '.join(self.cms_fields)}" ) self.logger.debug( f"[{self.infrastructure.instance_name}] Query on cms_main: {query}" @@ -302,6 +302,27 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: # Get the job as a blocking operation, set the cache, and return self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore + result_count = int(self._cms_main_job["resultCount"]) + self.logger.debug( + f"[TESTING DEBUG INFO] cms_content_lookup search returned {result_count} results" + ) + + # Log a sample of the actual results (first 3) + if result_count > 0: + sample_results = [] + iterator = ResultIterator( + response_reader=self._cms_main_job.results( + output_mode="json", count=3, offset=0 + ), # type: ignore + error_filters=[], + ) + for result in iterator: + sample_results.append(result) + + self.logger.debug( + f"[TESTING DEBUG INFO] Sample results (first {len(sample_results)}): " + f"{json.dumps(sample_results, indent=2)}" + ) return self._cms_main_job def get_num_cms_events(self, use_cache: bool = False) -> int: @@ -376,7 +397,15 @@ def validate_content_against_cms(self) -> None: offset += 1 # Get the name of the search in the CMS event - cms_entry_name = cms_event["action.correlationsearch.label"] + content = json.loads(cms_event["content"]) + self.logger.debug( + f"[TESTING DEBUG INFO] CMS Event content: {cms_event['content']}" + ) + self.logger.debug( + f"[TESTING DEBUG INFO] CMS Event content after json load: {type(content)}" + ) + # Get the name of the search in the CMS event + cms_entry_name = content["action.correlationsearch.label"] self.logger.info( f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry " f"'{cms_entry_name}' against detections" @@ -477,12 +506,14 @@ def validate_detection_against_cms_event( self.global_config.app ) + content = json.loads(cms_event["content"]) + cms_entry_name = content["action.correlationsearch.label"] + # Compare the correlation search label - if cms_event["action.correlationsearch.label"] != rule_name_from_detection: + if cms_entry_name != rule_name_from_detection: msg = ( f"[{self.infrastructure.instance_name}][{detection.name}]: Correlation search " - f"label in cms_event ('{cms_event['action.correlationsearch.label']}') does not " - "match detection name" + f"label in cms_event ('{cms_entry_name}') does not match detection name" ) self.logger.error(msg) return Exception(msg) From 0d649d7fce4616666dba4abbffb7378c1265fa1d Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 20 Oct 2025 12:09:40 -0700 Subject: [PATCH 2/9] Add version based validation --- .../objects/content_versioning_service.py | 117 +++++++++++++++--- 1 file changed, 97 insertions(+), 20 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 449bee59..beca2f2b 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -8,6 +8,7 @@ import splunklib.client as splunklib # type: ignore from pydantic import BaseModel, Field, PrivateAttr, computed_field +from semantic_version import Version from splunklib.binding import HTTPError, ResponseReader # type: ignore from splunklib.data import Record # type: ignore @@ -21,6 +22,9 @@ LOG_LEVEL = logging.DEBUG LOG_PATH = "content_versioning_service.log" +# The app name of ES; needed to check ES version +ES_APP_NAME = "SplunkEnterpriseSecuritySuite" + class ContentVersioningService(BaseModel): """ @@ -77,6 +81,47 @@ def setup_functions(self) -> list[tuple[Callable[[], None], str]]: (self.validate_content_against_cms, "Validating Against CMS"), ] + @cached_property + def es_version(self) -> Version | None: + """ + Returns the version of Enterprise Security installed on the instance; None if not installed. + + :return: the version of ES, as a semver aware object + :rtype: :class:`semantic_version.Version` + """ + if ES_APP_NAME not in self.service.apps: + return None + return Version(self.service.apps[ES_APP_NAME]["version"]) # type: ignore + + @cached_property + def kvstore_content_versioning(self) -> bool: + """ + Indicates whether we should test content versioning based on kvstore logic. Content versioning + should be tested with kvstore logic when ES is at least version 8.3.0. + + :return: a bool indicating whether we should test content versioning with kvstore logic + :rtype: bool + """ + es_version = self.es_version + return es_version is not None and es_version >= Version("8.3.0") + + @cached_property + def datastore_content_versioning(self) -> bool: + """ + Indicates whether we should test content versioning based on datastore logic. Content versioning + should be tested with datastore logic when ES is less than version 8.3.0 but greater than or equal + to version 8.0.0. + + :return: a bool indicating whether we should test content versioning with datastore logic + :rtype: bool + """ + es_version = self.es_version + return ( + es_version is not None + and es_version >= Version("8.0.0") + and es_version < Version("8.3.0") + ) + def _query_content_versioning_service( self, method: str, body: dict[str, Any] = {} ) -> Record: @@ -182,13 +227,43 @@ def cms_fields(self) -> list[str]: :returns: a list of strings, the fields we want :rtype: list[str] """ - return [ - "app_name", - "detection_id", - "version", - "content", - "sourcetype", - ] + if self.kvstore_content_versioning: + return [ + "app_name", + "detection_id", + "version", + "content", + "sourcetype", + ] + elif self.datastore_content_versioning: + return [ + "app_name", + "detection_id", + "version", + "action.correlationsearch.label", + "sourcetype", + ] + raise Exception(f"Something went wrong with es version: {self.es_version}") + + def _get_cms_entry_label(self, cms_event: dict[str, Any]) -> str: + """ + Extracts the correlation search label from a CMS event. + Handles both old (< 8.3.0) and new (>= 8.3.0) CMS lookup structures. + + :param cms_event: The event from the cms_main index + :type cms_event: dict[str, Any] + :return: The correlation search label + :rtype: str + """ + + if self.kvstore_content_versioning: + # ES 8.3.0+: Parse content JSON to get label + content = json.loads(cms_event["content"]) + return content["action.correlationsearch.label"] + elif self.datastore_content_versioning: + # ES < 8.3.0 and ES >= 8.0.0: Label is at top level + return cms_event["action.correlationsearch.label"] + raise Exception("Something went wrong with es version: {self.es_version}") @property def is_cms_parser_enabled(self) -> bool: @@ -292,10 +367,18 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: ) # Construct the query looking for CMS events matching the content app name - query = ( - f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" - f"| fields {', '.join(self.cms_fields)}" - ) + if self.kvstore_content_versioning: + query = ( + f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" + f"| fields {', '.join(self.cms_fields)}" + ) + elif self.datastore_content_versioning: + query = ( + f"search index=cms_main sourcetype=stash_common_detection_model " + f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}' + ) + else: + raise Exception("Something went wrong with es version: {self.es_version}") self.logger.debug( f"[{self.infrastructure.instance_name}] Query on cms_main: {query}" ) @@ -397,15 +480,10 @@ def validate_content_against_cms(self) -> None: offset += 1 # Get the name of the search in the CMS event - content = json.loads(cms_event["content"]) - self.logger.debug( - f"[TESTING DEBUG INFO] CMS Event content: {cms_event['content']}" - ) + cms_entry_name = self._get_cms_entry_label(cms_event) self.logger.debug( - f"[TESTING DEBUG INFO] CMS Event content after json load: {type(content)}" + f"[TESTING DEBUG INFO] CMS Event content: {cms_entry_name}" ) - # Get the name of the search in the CMS event - cms_entry_name = content["action.correlationsearch.label"] self.logger.info( f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry " f"'{cms_entry_name}' against detections" @@ -506,8 +584,7 @@ def validate_detection_against_cms_event( self.global_config.app ) - content = json.loads(cms_event["content"]) - cms_entry_name = content["action.correlationsearch.label"] + cms_entry_name = self._get_cms_entry_label(cms_event) # Compare the correlation search label if cms_entry_name != rule_name_from_detection: From d2c433673a10e0897bc8838574340787a8f889ed Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 23 Oct 2025 11:36:21 -0700 Subject: [PATCH 3/9] Add new endpoint to validate versioning active --- .../objects/content_versioning_service.py | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index beca2f2b..9797201e 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -142,12 +142,19 @@ def _query_content_versioning_service( # Query the content versioning service try: - response = self.service.request( # type: ignore - method=method, - path_segment="configs/conf-feature_flags/general", - body=body, - app="SA-ContentVersioning", - ) + if method == "GET" and self.kvstore_content_versioning: + response = self.service.request( + method=method, + path_segment="content_versioning/versioning_apps", + app="SA-ContentVersioning", + ) + if self.datastore_content_versioning: + response = self.service.request( # type: ignore + method=method, + path_segment="configs/conf-feature_flags/general", + body=body, + app="SA-ContentVersioning", + ) except HTTPError as e: # Raise on any HTTP errors raise HTTPError(f"Error querying content versioning service: {e}") from e @@ -186,9 +193,20 @@ def is_versioning_activated(self) -> bool: # Find the versioning_activated field and report any errors try: - for entry in data["entry"]: - if entry["name"] == "general": - return bool(int(entry["content"]["versioning_activated"])) + if self.kvstore_content_versioning: + if "content" in data: + for app in data["content"]: + if ( + app.get("name") == "DA-ESS-ContentUpdate" + and app.get("status") == "active" + ): + return True + else: + return False + if self.datastore_content_versioning: + for entry in data["entry"]: + if entry["name"] == "general": + return bool(int(entry["content"]["versioning_activated"])) except KeyError as e: raise KeyError( "Cannot retrieve versioning status, unable to determine versioning status using " @@ -204,9 +222,19 @@ def activate_versioning(self) -> None: Activate the content versioning service """ # Post to the SA-ContentVersioning service to set versioning status - self._query_content_versioning_service( - method="POST", body={"versioning_activated": True} - ) + if self.datastore_content_versioning: + self._query_content_versioning_service( + method="POST", body={"versioning_activated": True} + ) + + # Wait for versioning to be activated for ES 8.3.0+ + if self.kvstore_content_versioning: + timeout = 600 + while not self.is_versioning_activated: + time.sleep(60) + timeout -= 60 + if timeout <= 0: + break # Confirm versioning has been enabled if not self.is_versioning_activated: From 0aeebfd899226fc9c2e43994c556fbe0d1170cf0 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 23 Oct 2025 13:04:39 -0700 Subject: [PATCH 4/9] Create CMSEvent model --- .../objects/content_versioning_service.py | 127 ++++++++++-------- 1 file changed, 70 insertions(+), 57 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 9797201e..084686e6 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -7,7 +7,13 @@ from typing import Any, Callable import splunklib.client as splunklib # type: ignore -from pydantic import BaseModel, Field, PrivateAttr, computed_field +from pydantic import ( + BaseModel, + Field, + PrivateAttr, + computed_field, + model_validator, +) from semantic_version import Version from splunklib.binding import HTTPError, ResponseReader # type: ignore from splunklib.data import Record # type: ignore @@ -26,6 +32,55 @@ ES_APP_NAME = "SplunkEnterpriseSecuritySuite" +class CMSEvent(BaseModel): + """ + A model representing a CMS event. This is used to validate that detections have been installed + in a way that is compatible with content versioning. + """ + + content: str # JSON string + + # The app name of the detection + app_name: str + + # The detection id of the detection + detection_id: str + + # The version of the detection + version: str + + # The saved search name of the detection + action_correlationsearch_label: str + + @model_validator(mode="before") + @classmethod + def extract_from_content(cls, data): + """Extract fields from content JSON if not already provided""" + if isinstance(data, dict) and "content" in data: + try: + content_str = data.get("content") + parsed = json.loads(content_str) + + # Extract metadata fields - note the key has dots in it + metadata_str = parsed.get("action.correlationsearch.metadata", {}) + metadata = ( + json.loads(metadata_str) + if isinstance(metadata_str, str) + else metadata_str + ) + data.setdefault("app_name", metadata.get("app_name")) + data.setdefault("detection_id", metadata.get("detection_id")) + data.setdefault("version", metadata.get("version")) + data.setdefault( + "action_correlationsearch_label", + parsed.get("action.correlationsearch.label"), + ) + except (json.JSONDecodeError, AttributeError, KeyError, TypeError): + # If parsing fails, let Pydantic handle validation errors + raise ValueError("Failed to parse content JSON {}".format(data)) + return data + + class ContentVersioningService(BaseModel): """ A model representing the content versioning service used in ES 8.0.0+. This model can be used @@ -246,53 +301,6 @@ def activate_versioning(self) -> None: f"[{self.infrastructure.instance_name}] Versioning service successfully activated" ) - @computed_field - @cached_property - def cms_fields(self) -> list[str]: - """ - Property listing the fields we want to pull from the cms_main index - - :returns: a list of strings, the fields we want - :rtype: list[str] - """ - if self.kvstore_content_versioning: - return [ - "app_name", - "detection_id", - "version", - "content", - "sourcetype", - ] - elif self.datastore_content_versioning: - return [ - "app_name", - "detection_id", - "version", - "action.correlationsearch.label", - "sourcetype", - ] - raise Exception(f"Something went wrong with es version: {self.es_version}") - - def _get_cms_entry_label(self, cms_event: dict[str, Any]) -> str: - """ - Extracts the correlation search label from a CMS event. - Handles both old (< 8.3.0) and new (>= 8.3.0) CMS lookup structures. - - :param cms_event: The event from the cms_main index - :type cms_event: dict[str, Any] - :return: The correlation search label - :rtype: str - """ - - if self.kvstore_content_versioning: - # ES 8.3.0+: Parse content JSON to get label - content = json.loads(cms_event["content"]) - return content["action.correlationsearch.label"] - elif self.datastore_content_versioning: - # ES < 8.3.0 and ES >= 8.0.0: Label is at top level - return cms_event["action.correlationsearch.label"] - raise Exception("Something went wrong with es version: {self.es_version}") - @property def is_cms_parser_enabled(self) -> bool: """ @@ -398,12 +406,12 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: if self.kvstore_content_versioning: query = ( f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" - f"| fields {', '.join(self.cms_fields)}" + f"| fields content" ) elif self.datastore_content_versioning: query = ( f"search index=cms_main sourcetype=stash_common_detection_model " - f'app_name="{self.global_config.app.appid}" | fields {", ".join(self.cms_fields)}' + f'app_name="{self.global_config.app.appid}" | fields _raw' ) else: raise Exception("Something went wrong with es version: {self.es_version}") @@ -507,8 +515,13 @@ def validate_content_against_cms(self) -> None: # Increment the offset for each result offset += 1 + if self.kvstore_content_versioning: + cms_event = CMSEvent(content=cms_event["content"]) + elif self.datastore_content_versioning: + cms_event = CMSEvent(content=cms_event["_raw"]) + # Get the name of the search in the CMS event - cms_entry_name = self._get_cms_entry_label(cms_event) + cms_entry_name = cms_event.action_correlationsearch_label self.logger.debug( f"[TESTING DEBUG INFO] CMS Event content: {cms_entry_name}" ) @@ -591,14 +604,14 @@ def validate_content_against_cms(self) -> None: ) def validate_detection_against_cms_event( - self, cms_event: dict[str, Any], detection: Detection + self, cms_event: CMSEvent, detection: Detection ) -> Exception | None: """ Given an event from the cms_main index and the matched detection, compare fields and look for any inconsistencies :param cms_event: The event from the cms_main index - :type cms_event: dict[str, Any] + :type cms_event: CMSEvent :param detection: The matched detection :type detection: :class:`contentctl.objects.detection.Detection` @@ -607,12 +620,12 @@ def validate_detection_against_cms_event( """ # TODO (PEX-509): validate additional fields between the cms_event and the detection - cms_uuid = uuid.UUID(cms_event["detection_id"]) + cms_uuid = uuid.UUID(cms_event.detection_id) rule_name_from_detection = detection.get_action_dot_correlationsearch_dot_label( self.global_config.app ) - cms_entry_name = self._get_cms_entry_label(cms_event) + cms_entry_name = cms_event.action_correlationsearch_label # Compare the correlation search label if cms_entry_name != rule_name_from_detection: @@ -630,12 +643,12 @@ def validate_detection_against_cms_event( ) self.logger.error(msg) return Exception(msg) - elif cms_event["version"] != f"{detection.version}.1": + elif cms_event.version != f"{detection.version}.1": # Compare the versions (we append '.1' to the detection version to be in line w/ the # internal representation in ES) msg = ( f"[{self.infrastructure.instance_name}] [{detection.name}]: Version in cms_event " - f"('{cms_event['version']}') does not match version in detection " + f"('{cms_event.version}') does not match version in detection " f"('{detection.version}.1')" ) self.logger.error(msg) From df40d1613882993e32a754c06d775e578226c848 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 23 Oct 2025 13:05:49 -0700 Subject: [PATCH 5/9] Turn off logger and clear debug logs --- contentctl/objects/content_versioning_service.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 084686e6..899bb8a1 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -24,7 +24,7 @@ from contentctl.objects.detection import Detection # Suppress logging by default; enable for local testing -ENABLE_LOGGING = True +ENABLE_LOGGING = False LOG_LEVEL = logging.DEBUG LOG_PATH = "content_versioning_service.log" @@ -422,9 +422,6 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: # Get the job as a blocking operation, set the cache, and return self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore result_count = int(self._cms_main_job["resultCount"]) - self.logger.debug( - f"[TESTING DEBUG INFO] cms_content_lookup search returned {result_count} results" - ) # Log a sample of the actual results (first 3) if result_count > 0: @@ -438,10 +435,6 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: for result in iterator: sample_results.append(result) - self.logger.debug( - f"[TESTING DEBUG INFO] Sample results (first {len(sample_results)}): " - f"{json.dumps(sample_results, indent=2)}" - ) return self._cms_main_job def get_num_cms_events(self, use_cache: bool = False) -> int: @@ -522,9 +515,6 @@ def validate_content_against_cms(self) -> None: # Get the name of the search in the CMS event cms_entry_name = cms_event.action_correlationsearch_label - self.logger.debug( - f"[TESTING DEBUG INFO] CMS Event content: {cms_entry_name}" - ) self.logger.info( f"[{self.infrastructure.instance_name}] {offset}: Matching cms_main entry " f"'{cms_entry_name}' against detections" From 2aaf96d800ffe3acf695d0f623346468dbc900e8 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 23 Oct 2025 13:08:35 -0700 Subject: [PATCH 6/9] Remove debug code --- contentctl/objects/content_versioning_service.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 899bb8a1..116f7d11 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -421,19 +421,6 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: # Get the job as a blocking operation, set the cache, and return self._cms_main_job = self.service.search(query, exec_mode="blocking") # type: ignore - result_count = int(self._cms_main_job["resultCount"]) - - # Log a sample of the actual results (first 3) - if result_count > 0: - sample_results = [] - iterator = ResultIterator( - response_reader=self._cms_main_job.results( - output_mode="json", count=3, offset=0 - ), # type: ignore - error_filters=[], - ) - for result in iterator: - sample_results.append(result) return self._cms_main_job From 49893f2d13466002081d727dc9ed35f601a969a5 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Thu, 23 Oct 2025 14:31:14 -0700 Subject: [PATCH 7/9] Add error message and version validation --- contentctl/objects/content_versioning_service.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 116f7d11..ee5b32a2 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -251,11 +251,17 @@ def is_versioning_activated(self) -> bool: if self.kvstore_content_versioning: if "content" in data: for app in data["content"]: - if ( - app.get("name") == "DA-ESS-ContentUpdate" - and app.get("status") == "active" - ): - return True + if app.get("name") == "DA-ESS-ContentUpdate": + # If there is error message versioning is not activated properly + if "message" in app: + return False + + # If the installed verion is not the same as the test version + if app.get("version") != self.global_config.app.version: + return False + + if app.get("status") == "active": + return True else: return False if self.datastore_content_versioning: From 0849b6605064e18248133f460a7f2a4c1fa18b1b Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 10 Nov 2025 13:00:57 -0800 Subject: [PATCH 8/9] Add descriptive error message --- contentctl/objects/content_versioning_service.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index ee5b32a2..3cd3a689 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -420,7 +420,19 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: f'app_name="{self.global_config.app.appid}" | fields _raw' ) else: - raise Exception("Something went wrong with es version: {self.es_version}") + if self.kvstore_content_versioning: + raise Exception( + f"Unable to perform search to cms_content_lookup in ES version {self.es_version}" + ) + elif self.datastore_content_versioning: + raise Exception( + f"Unable to perform search to cms_main index in ES version {self.es_version}" + ) + else: + raise Exception( + f"Unable to determine content versioning method for ES version {self.es_version}. " + "Expected ES version >= 8.0.0." + ) self.logger.debug( f"[{self.infrastructure.instance_name}] Query on cms_main: {query}" ) From 5a346568389c0697257a452ab0ef6b1f81620d52 Mon Sep 17 00:00:00 2001 From: Xiaonan Qi Date: Mon, 10 Nov 2025 13:05:17 -0800 Subject: [PATCH 9/9] Update datastore to indexbased --- .../objects/content_versioning_service.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/contentctl/objects/content_versioning_service.py b/contentctl/objects/content_versioning_service.py index 3cd3a689..9e774f98 100644 --- a/contentctl/objects/content_versioning_service.py +++ b/contentctl/objects/content_versioning_service.py @@ -161,13 +161,13 @@ def kvstore_content_versioning(self) -> bool: return es_version is not None and es_version >= Version("8.3.0") @cached_property - def datastore_content_versioning(self) -> bool: + def indexbased_content_versioning(self) -> bool: """ - Indicates whether we should test content versioning based on datastore logic. Content versioning - should be tested with datastore logic when ES is less than version 8.3.0 but greater than or equal + Indicates whether we should test content versioning based on indexbased logic. Content versioning + should be tested with indexbased logic when ES is less than version 8.3.0 but greater than or equal to version 8.0.0. - :return: a bool indicating whether we should test content versioning with datastore logic + :return: a bool indicating whether we should test content versioning with indexbased logic :rtype: bool """ es_version = self.es_version @@ -203,7 +203,7 @@ def _query_content_versioning_service( path_segment="content_versioning/versioning_apps", app="SA-ContentVersioning", ) - if self.datastore_content_versioning: + if self.indexbased_content_versioning: response = self.service.request( # type: ignore method=method, path_segment="configs/conf-feature_flags/general", @@ -264,7 +264,7 @@ def is_versioning_activated(self) -> bool: return True else: return False - if self.datastore_content_versioning: + if self.indexbased_content_versioning: for entry in data["entry"]: if entry["name"] == "general": return bool(int(entry["content"]["versioning_activated"])) @@ -283,7 +283,7 @@ def activate_versioning(self) -> None: Activate the content versioning service """ # Post to the SA-ContentVersioning service to set versioning status - if self.datastore_content_versioning: + if self.indexbased_content_versioning: self._query_content_versioning_service( method="POST", body={"versioning_activated": True} ) @@ -414,7 +414,7 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: f"| inputlookup cms_content_lookup | search app_name={self.global_config.app.appid}" f"| fields content" ) - elif self.datastore_content_versioning: + elif self.indexbased_content_versioning: query = ( f"search index=cms_main sourcetype=stash_common_detection_model " f'app_name="{self.global_config.app.appid}" | fields _raw' @@ -424,7 +424,7 @@ def _query_cms_main(self, use_cache: bool = False) -> splunklib.Job: raise Exception( f"Unable to perform search to cms_content_lookup in ES version {self.es_version}" ) - elif self.datastore_content_versioning: + elif self.indexbased_content_versioning: raise Exception( f"Unable to perform search to cms_main index in ES version {self.es_version}" ) @@ -515,7 +515,7 @@ def validate_content_against_cms(self) -> None: if self.kvstore_content_versioning: cms_event = CMSEvent(content=cms_event["content"]) - elif self.datastore_content_versioning: + elif self.indexbased_content_versioning: cms_event = CMSEvent(content=cms_event["_raw"]) # Get the name of the search in the CMS event