diff --git a/src/sentry/data_export/base.py b/src/sentry/data_export/base.py index 431eca2c0f3e74..eb72fad6412d80 100644 --- a/src/sentry/data_export/base.py +++ b/src/sentry/data_export/base.py @@ -29,9 +29,11 @@ class ExportQueryType: ISSUES_BY_TAG = 0 DISCOVER = 1 EXPLORE = 2 + TRACE_ITEM_FULL_EXPORT = 3 ISSUES_BY_TAG_STR = "Issues-by-Tag" DISCOVER_STR = "Discover" EXPLORE_STR = "Explore" + TRACE_ITEM_FULL_EXPORT_STR = "trace_item_full_export" @classmethod def as_choices(cls) -> tuple[tuple[int, str], ...]: @@ -39,6 +41,7 @@ def as_choices(cls) -> tuple[tuple[int, str], ...]: (cls.ISSUES_BY_TAG, str(cls.ISSUES_BY_TAG_STR)), (cls.DISCOVER, str(cls.DISCOVER_STR)), (cls.EXPLORE, str(cls.EXPLORE_STR)), + (cls.TRACE_ITEM_FULL_EXPORT, str(cls.TRACE_ITEM_FULL_EXPORT_STR)), ) @classmethod @@ -47,6 +50,7 @@ def as_str_choices(cls) -> tuple[tuple[str, str], ...]: (cls.ISSUES_BY_TAG_STR, cls.ISSUES_BY_TAG_STR), (cls.DISCOVER_STR, cls.DISCOVER_STR), (cls.EXPLORE_STR, cls.EXPLORE_STR), + (cls.TRACE_ITEM_FULL_EXPORT_STR, cls.TRACE_ITEM_FULL_EXPORT_STR), ) @classmethod @@ -57,6 +61,8 @@ def as_str(cls, integer: int) -> str: return cls.DISCOVER_STR elif integer == cls.EXPLORE: return cls.EXPLORE_STR + elif integer == cls.TRACE_ITEM_FULL_EXPORT: + return cls.TRACE_ITEM_FULL_EXPORT_STR raise ValueError(f"Invalid ExportQueryType: {integer}") @classmethod @@ -67,4 +73,6 @@ def from_str(cls, string: str) -> int: return cls.DISCOVER elif string == cls.EXPLORE_STR: return cls.EXPLORE + elif string == cls.TRACE_ITEM_FULL_EXPORT_STR: + return cls.TRACE_ITEM_FULL_EXPORT raise ValueError(f"Invalid ExportQueryType: {string}") diff --git a/src/sentry/data_export/endpoints/data_export.py b/src/sentry/data_export/endpoints/data_export.py index c02eb4edb8caa1..61e9b736749c87 100644 --- a/src/sentry/data_export/endpoints/data_export.py +++ b/src/sentry/data_export/endpoints/data_export.py @@ -67,7 +67,10 @@ def _validate_dataset(self, query_type: str, query_info: dict[str, Any]) -> dict dataset = dataset or "discover" if dataset not in SUPPORTED_DATASETS: raise serializers.ValidationError(f"{dataset} is not supported for exports") - elif query_type == ExportQueryType.EXPLORE_STR: + elif query_type in ( + ExportQueryType.EXPLORE_STR, + ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR, + ): if not dataset: raise serializers.ValidationError( f"Please specify dataset. Supported datasets for this query type are {str(SUPPORTED_TRACE_ITEM_DATASETS.keys())}." @@ -87,18 +90,13 @@ def _validate_query_info( elif not isinstance(base_fields, list): base_fields = [base_fields] - is_jsonl_trace_item_full_export = ( - query_type == ExportQueryType.EXPLORE_STR - and export_format == OutputMode.JSONL.value - and len(base_fields) == 0 - ) + is_jsonl_trace_item_full_export = query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR if len(base_fields) > MAX_FIELDS: detail = f"You can export up to {MAX_FIELDS} fields at a time. Please delete some and try again." raise serializers.ValidationError(detail) - elif len(base_fields) == 0: - if not is_jsonl_trace_item_full_export: - raise serializers.ValidationError("at least one field is required to export") + elif len(base_fields) == 0 and not is_jsonl_trace_item_full_export: + raise serializers.ValidationError("at least one field is required to export") if "query" not in query_info: if is_jsonl_trace_item_full_export: @@ -136,7 +134,10 @@ def _validate_query_info( query_info["start"] = start.isoformat() query_info["end"] = end.isoformat() - if query_type == ExportQueryType.EXPLORE_STR: + if ( + query_type == ExportQueryType.EXPLORE_STR + or query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR + ): sort = query_info.get("sort", []) if sort and isinstance(sort, str): sort = [sort] @@ -215,38 +216,43 @@ def validate(self, data: dict[str, Any]) -> dict[str, Any]: ) query_info = self._validate_dataset(query_type, query_info) explore_output_mode = OutputMode.from_value(export_format) - is_full_jsonl_trace_item_export = ( - export_format == OutputMode.JSONL.value and len(query_info.get("field", [])) == 0 - ) - if not is_full_jsonl_trace_item_export: - try: - explore_processor = ExploreProcessor( - explore_query=query_info, - organization=organization, - output_mode=explore_output_mode, - ) - sort = query_info.get("sort", []) - orderby = [sort] if isinstance(sort, str) else sort - - explore_processor.validate_export_query( - rpc_dataset_common.TableQuery( - query_string=query_info["query"], - selected_columns=query_info["field"], - orderby=orderby, - offset=0, - limit=1, - referrer=Referrer.DATA_EXPORT_TASKS_EXPLORE, - sampling_mode=explore_processor.sampling_mode, - resolver=explore_processor.search_resolver, - equations=query_info.get("equations", []), - ) + try: + explore_processor = ExploreProcessor( + explore_query=query_info, + organization=organization, + output_mode=explore_output_mode, + ) + sort = query_info.get("sort", []) + orderby = [sort] if isinstance(sort, str) else sort + + explore_processor.validate_export_query( + rpc_dataset_common.TableQuery( + query_string=query_info["query"], + selected_columns=query_info["field"], + orderby=orderby, + offset=0, + limit=1, + referrer=Referrer.DATA_EXPORT_TASKS_EXPLORE, + sampling_mode=explore_processor.sampling_mode, + resolver=explore_processor.search_resolver, + equations=query_info.get("equations", []), ) - except InvalidSearchQuery as err: - sentry_sdk.capture_exception(err) - raise serializers.ValidationError("Invalid table query.") + ) + except InvalidSearchQuery as err: + sentry_sdk.capture_exception(err) + raise serializers.ValidationError("Invalid table query.") + elif query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR: + query_info = self._validate_query_info( + query_type, query_info, export_format=export_format + ) + query_info = self._validate_dataset(query_type, query_info) + explore_output_mode = OutputMode.from_value(export_format) + if explore_output_mode != OutputMode.JSONL: + raise serializers.ValidationError("For full export, output mode must be JSONL.") elif data["query_type"] == ExportQueryType.ISSUES_BY_TAG_STR: issues_by_tag_validate(query_info) + data["query_info"] = query_info return data @@ -287,7 +293,7 @@ def _parse_limit(self, data: dict[str, Any]) -> tuple[int | None, bool]: run_sync = ( limit is not None and limit <= MAX_SYNC_LIMIT - and data["query_type"] == ExportQueryType.EXPLORE_STR + and data["query_type"] == ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR and data["query_info"].get("dataset") == "logs" ) return limit, run_sync @@ -308,7 +314,10 @@ def post(self, request: Request, organization: Organization) -> Response: # The data export feature is only available alongside `discover-query` (except for explore). # So to export issue tags, they must have have `discover-query` if not features.has("organizations:discover-query", organization): - if request.data.get("query_type") != ExportQueryType.EXPLORE_STR: + if request.data.get("query_type") not in { + ExportQueryType.EXPLORE_STR, + ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR, + }: return Response(status=404) # Get environment_id and limit if available diff --git a/src/sentry/data_export/tasks.py b/src/sentry/data_export/tasks.py index 4eacf5a5ac00c6..6480ed08922484 100644 --- a/src/sentry/data_export/tasks.py +++ b/src/sentry/data_export/tasks.py @@ -71,14 +71,6 @@ def _export_metric_tags(data_export: ExportedData) -> dict[str, str]: } -def _is_full_jsonl_trace_item_export(data_export: ExportedData, output_mode: OutputMode) -> bool: - return ( - data_export.query_type == ExportQueryType.EXPLORE - and output_mode == OutputMode.JSONL - and len(data_export.query_info.get("field", [])) == 0 - ) - - def _page_token_b64_from_processor( processor: IssuesByTagProcessor | DiscoverProcessor | ExploreProcessor, ) -> str | None: @@ -464,25 +456,26 @@ def get_processor( organization=data_export.organization, ) elif data_export.query_type == ExportQueryType.EXPLORE: - if _is_full_jsonl_trace_item_export(data_export, output_mode): - page_token: bytes | None = None - if page_token_b64: - try: - page_token = base64.b64decode(page_token_b64) - except (ValueError, TypeError) as e: - raise ExportError("Invalid export trace item pagination state.") from e - return TraceItemFullExportProcessor( - explore_query=data_export.query_info, - organization=data_export.organization, - output_mode=output_mode, - page_token=page_token, - last_emitted_item_id_hex=last_emitted_item_id_hex, - ) return ExploreProcessor( explore_query=data_export.query_info, organization=data_export.organization, output_mode=output_mode, ) + elif data_export.query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT: + page_token: bytes | None = None + if page_token_b64: + try: + page_token = base64.b64decode(page_token_b64) + except (ValueError, TypeError) as e: + raise ExportError("Invalid export trace item pagination state.") from e + return TraceItemFullExportProcessor( + explore_query=data_export.query_info, + organization=data_export.organization, + output_mode=output_mode, + page_token=page_token, + last_emitted_item_id_hex=last_emitted_item_id_hex, + ) + else: raise ExportError(f"No processor found for this query type: {data_export.query_type}") except ExportError as error: @@ -502,7 +495,10 @@ def process_rows( rows = process_issues_by_tag(processor, batch_size, offset) elif data_export.query_type == ExportQueryType.DISCOVER: rows = process_discover(processor, batch_size, offset) - elif data_export.query_type == ExportQueryType.EXPLORE: + elif ( + data_export.query_type == ExportQueryType.EXPLORE + or data_export.query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT + ): rows = process_explore(processor, batch_size, offset) else: raise ExportError(f"No processor found for this query type: {data_export.query_type}") diff --git a/tests/sentry/data_export/test_tasks.py b/tests/sentry/data_export/test_tasks.py index cbd37451d0a098..19890b29efacb0 100644 --- a/tests/sentry/data_export/test_tasks.py +++ b/tests/sentry/data_export/test_tasks.py @@ -867,7 +867,7 @@ def _explore_logs_jsonl_rich_field_api_request_body( self, start: str, end: str, *, limit: int | None = None ) -> dict[str, Any]: body: dict[str, Any] = { - "query_type": ExportQueryType.EXPLORE_STR, + "query_type": ExportQueryType.TRACE_ITEM_FULL_EXPORT_STR, "format": OutputMode.JSONL.value, "query_info": { "project": [self.project.id], @@ -906,7 +906,7 @@ def _assert_explore_logs_jsonl_export_create_payload( ) -> ExportedData: de = ExportedData.objects.get(id=payload["id"]) assert de.user_id == self.user.id - assert de.query_type == ExportQueryType.EXPLORE + assert de.query_type == ExportQueryType.TRACE_ITEM_FULL_EXPORT assert de.export_format == OutputMode.JSONL.value assert de.query_info["dataset"] == "logs" return de