diff --git a/src/sentry/search/eap/occurrences/aggregates.py b/src/sentry/search/eap/occurrences/aggregates.py index fca83112b34712..f75ae1bced4faf 100644 --- a/src/sentry/search/eap/occurrences/aggregates.py +++ b/src/sentry/search/eap/occurrences/aggregates.py @@ -113,6 +113,24 @@ ], ), "count_unique": count_unique_aggregate_definition(default_arg="group_id"), + "first_seen": AggregateDefinition( + internal_function=Function.FUNCTION_MIN, + default_search_type="integer", + infer_search_type_from_arguments=False, + arguments=[ + AttributeArgumentDefinition( + attribute_types={ + "duration", + "number", + "integer", + "string", + *constants.SIZE_TYPE, + *constants.DURATION_TYPE, + }, + default_arg="timestamp", + ) + ], + ), "last_seen": AggregateDefinition( internal_function=Function.FUNCTION_MAX, default_search_type="integer", diff --git a/src/sentry/search/eap/occurrences/search_executor.py b/src/sentry/search/eap/occurrences/search_executor.py new file mode 100644 index 00000000000000..d067efe80121a3 --- /dev/null +++ b/src/sentry/search/eap/occurrences/search_executor.py @@ -0,0 +1,287 @@ +import logging +from collections.abc import Sequence +from datetime import datetime +from typing import Any + +from sentry.api.event_search import SearchFilter +from sentry.models.environment import Environment +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.search.eap.occurrences.query_utils import build_group_id_in_filter +from sentry.search.eap.types import SearchResolverConfig +from sentry.search.events.types import SnubaParams +from sentry.snuba.occurrences_rpc import Occurrences + +logger = logging.getLogger(__name__) + + +# Filters that must be skipped because they have no EAP equivalent. +# These would silently become dynamic tag lookups in the EAP SearchResolver +# and produce incorrect results. +# TODO: these are potentially gaps between existing issue feed search behavior and EAP search behavior. May need to adddress. +SKIP_FILTERS: frozenset[str] = frozenset( + { + # event.type is added internally by _query_params_for_error(), not from user filters. + # EAP occurrences don't use event.type — they're pre-typed. + "event.type", + # Require Postgres Release table lookups (semver matching, stage resolution). + "release.stage", + "release.version", + "release.package", + "release.build", + # Virtual alias that expands to coalesce(user.email, user.username, ...). + # No EAP equivalent. + "user.display", + # Requires team context lookup. + "team_key_transaction", + # Requires Snuba-specific status code translation. + "transaction.status", + } +) + +# Filters that need key name translation from legacy Snuba names to EAP attribute names. +# TODO: instead of translating this key, maybe we should just set the public alias for this attribute to "error.main_thread"? +TRANSLATE_KEYS: dict[str, str] = { + "error.main_thread": "exception_main_thread", +} + +# Legacy aggregation field names → EAP aggregate function syntax. +# In the legacy path these become HAVING clauses (e.g. times_seen:>100 → HAVING count() > 100). +# The EAP SearchResolver parses function syntax like count():>100 as AggregateFilter objects +# and routes them to the aggregation_filter field on the RPC request. +AGGREGATION_FIELD_TO_EAP_FUNCTION: dict[str, str] = { + "last_seen": "last_seen()", + "times_seen": "count()", + "first_seen": "first_seen()", + "user_count": "count_unique(user)", +} + + +def search_filters_to_query_string( + search_filters: Sequence[SearchFilter], +) -> str: + """ + Convert Snuba-relevant SearchFilter objects to an EAP query string. + + Expects filters that have already been stripped of postgres-only fields + (status, assigned_to, bookmarked_by, etc.) by the caller. + + Returns a query string like: 'level:error platform:python message:"foo bar"' + compatible with the EAP SearchResolver's parse_search_query(). + """ + parts: list[str] = [] + for sf in search_filters: + part = _convert_single_filter(sf) + if part is not None: + parts.append(part) + return " ".join(parts) + + +def _convert_single_filter(sf: SearchFilter) -> str | None: + key = sf.key.name + op = sf.operator + raw_value = sf.value.raw_value + + if key in AGGREGATION_FIELD_TO_EAP_FUNCTION: + return _convert_aggregation_filter(sf) + + if key in SKIP_FILTERS: + return None + + # error.unhandled requires special inversion logic. + # Legacy uses notHandled() Snuba function; EAP has error.handled attribute. + if key == "error.unhandled": + return _convert_error_unhandled(sf) + + if key in TRANSLATE_KEYS: + key = TRANSLATE_KEYS[key] + + # has / !has filters: empty string value with = or != + if raw_value == "" and op in ("=", "!="): + if op == "!=": + return f"has:{key}" + else: + return f"!has:{key}" + + formatted_value = _format_value(raw_value) + + if op == "=": + return f"{key}:{formatted_value}" + elif op == "!=": + return f"!{key}:{formatted_value}" + elif op in (">", ">=", "<", "<="): + return f"{key}:{op}{formatted_value}" + elif op == "IN": + return f"{key}:{formatted_value}" + elif op == "NOT IN": + return f"!{key}:{formatted_value}" + + logger.warning( + "eap.search_executor.unknown_operator", + extra={"key": key, "operator": op}, + ) + return None + + +def _convert_aggregation_filter(sf: SearchFilter) -> str | None: + eap_function = AGGREGATION_FIELD_TO_EAP_FUNCTION[sf.key.name] + formatted_value = _format_value(sf.value.raw_value) + + if sf.operator in (">", ">=", "<", "<="): + return f"{eap_function}:{sf.operator}{formatted_value}" + elif sf.operator == "=": + return f"{eap_function}:{formatted_value}" + elif sf.operator == "!=": + return f"!{eap_function}:{formatted_value}" + + return None + + +def _convert_error_unhandled(sf: SearchFilter) -> str | None: + raw_value = sf.value.raw_value + op = sf.operator + + is_looking_for_unhandled = (op == "=" and raw_value in ("1", 1, True, "true")) or ( + op == "!=" and raw_value in ("0", 0, False, "false") + ) + + if is_looking_for_unhandled: + return "!error.handled:1" + else: + return "error.handled:1" + + +def _format_value( + raw_value: str | int | float | datetime | Sequence[str] | Sequence[float], +) -> str: + if isinstance(raw_value, (list, tuple)): + parts = ", ".join(_format_single_value(v) for v in raw_value) + return f"[{parts}]" + if isinstance(raw_value, datetime): + return raw_value.isoformat() + if isinstance(raw_value, (int, float)): + return str(raw_value) + return _format_string_value(str(raw_value)) + + +def _format_single_value(value: str | int | float | datetime) -> str: + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, (int, float)): + return str(value) + return _format_string_value(str(value)) + + +def _format_string_value(s: str) -> str: + # Quote strings containing spaces or special characters. + if " " in s or '"' in s or "," in s or "(" in s or ")" in s: + escaped = s.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + + return s + + +# Maps legacy sort_field names (from PostgresSnubaQueryExecutor.sort_strategies values) +# to (selected_columns, orderby) for EAP queries. +# +# Reference — legacy sort_strategies in executors.py: +# "date" → "last_seen" → max(timestamp) * 1000 +# "freq" → "times_seen" → count() +# "new" → "first_seen" → min(coalesce(group_first_seen, timestamp)) * 1000 +# "user" → "user_count" → uniq(tags[sentry:user]) +# "trends" → "trends" → complex ClickHouse expression (not supported) +# "recommended" → "recommended" → complex ClickHouse expression (not supported) +# "inbox" → "" → Postgres only (not supported) +EAP_SORT_STRATEGIES: dict[str, tuple[list[str], list[str]]] = { + "last_seen": (["group_id", "last_seen()"], ["-last_seen()"]), + "times_seen": (["group_id", "count()"], ["-count()"]), + "first_seen": (["group_id", "first_seen()"], ["-first_seen()"]), + "user_count": (["group_id", "count_unique(user)"], ["-count_unique(user)"]), +} + + +def run_eap_group_search( + start: datetime, + end: datetime, + project_ids: Sequence[int], + environment_ids: Sequence[int] | None, + sort_field: str, + organization: Organization, + group_ids: Sequence[int] | None = None, + limit: int | None = None, + offset: int = 0, + search_filters: Sequence[SearchFilter] | None = None, + referrer: str = "", +) -> tuple[list[tuple[int, Any]], int]: + """ + EAP equivalent of PostgresSnubaQueryExecutor.snuba_search(). + + Returns a tuple of: + * a list of (group_id, sort_score) tuples, + * total count (0 during double-reading; legacy provides the real total). + """ + if sort_field not in EAP_SORT_STRATEGIES: + return ([], 0) + + selected_columns, orderby = EAP_SORT_STRATEGIES[sort_field] + score_column = selected_columns[1] # e.g. "last_seen()" or "count()" + + projects = list(Project.objects.filter(id__in=project_ids)) + if not projects: + return ([], 0) + + environments: list[Environment] = [] + if environment_ids: + environments = list( + Environment.objects.filter(organization_id=organization.id, id__in=environment_ids) + ) + + snuba_params = SnubaParams( + start=start, + end=end, + organization=organization, + projects=projects, + environments=environments, + ) + + query_string = search_filters_to_query_string(search_filters or []) + + extra_conditions = None + if group_ids: + extra_conditions = build_group_id_in_filter(group_ids) + + try: + result = Occurrences.run_table_query( + params=snuba_params, + query_string=query_string, + selected_columns=selected_columns, + orderby=orderby, + offset=offset, + limit=limit or 100, + referrer=referrer, + config=SearchResolverConfig(), + extra_conditions=extra_conditions, + ) + except Exception: + logger.exception( + "eap.search_executor.run_table_query_failed", + extra={ + "organization_id": organization.id, + "project_ids": project_ids, + "sort_field": sort_field, + "referrer": referrer, + }, + ) + return ([], 0) + + tuples: list[tuple[int, Any]] = [] + for row in result.get("data", []): + group_id = row.get("group_id") + score = row.get(score_column) + if group_id is not None: + tuples.append((int(group_id), score)) + + # TODO: the EAP RPC TraceItemTableResponse does not include a total count + # (unlike Snuba's totals=True). During double-reading the legacy result + # provides the real total, so we return 0 here. + return (tuples, 0) diff --git a/tests/sentry/search/eap/test_search_executor.py b/tests/sentry/search/eap/test_search_executor.py new file mode 100644 index 00000000000000..4a9d181c0eebe3 --- /dev/null +++ b/tests/sentry/search/eap/test_search_executor.py @@ -0,0 +1,356 @@ +from datetime import datetime, timedelta, timezone + +from sentry.api.event_search import SearchFilter, SearchKey, SearchValue +from sentry.search.eap.occurrences.search_executor import ( + run_eap_group_search, + search_filters_to_query_string, +) +from sentry.testutils.cases import OccurrenceTestCase, SnubaTestCase, TestCase + + +class TestSearchFiltersToQueryString: + def test_all_operator_types(self): + cases = [ + (SearchFilter(SearchKey("level"), "=", SearchValue("error")), "level:error"), + (SearchFilter(SearchKey("level"), "!=", SearchValue("error")), "!level:error"), + (SearchFilter(SearchKey("count"), ">", SearchValue("5")), "count:>5"), + (SearchFilter(SearchKey("count"), ">=", SearchValue("5")), "count:>=5"), + (SearchFilter(SearchKey("count"), "<", SearchValue("5")), "count:<5"), + (SearchFilter(SearchKey("count"), "<=", SearchValue("5")), "count:<=5"), + ( + SearchFilter(SearchKey("level"), "IN", SearchValue(["error", "warning"])), + "level:[error, warning]", + ), + ( + SearchFilter(SearchKey("level"), "NOT IN", SearchValue(["error", "warning"])), + "!level:[error, warning]", + ), + ] + for sf, expected in cases: + assert search_filters_to_query_string([sf]) == expected, ( + f"Failed for operator {sf.operator}" + ) + + def test_value_formatting(self): + dt = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + cases = [ + # Wildcards pass through as-is + (SearchFilter(SearchKey("message"), "=", SearchValue("*foo*")), "message:*foo*"), + # Spaces trigger quoting + ( + SearchFilter(SearchKey("message"), "=", SearchValue("foo bar")), + 'message:"foo bar"', + ), + # Embedded quotes are escaped + ( + SearchFilter(SearchKey("message"), "=", SearchValue('foo "bar"')), + 'message:"foo \\"bar\\""', + ), + # Numeric values + (SearchFilter(SearchKey("count"), "=", SearchValue(42)), "count:42"), + (SearchFilter(SearchKey("count"), ">", SearchValue(3.14)), "count:>3.14"), + # Datetime values + ( + SearchFilter(SearchKey("timestamp"), ">", SearchValue(dt)), + "timestamp:>2024-01-15T12:00:00+00:00", + ), + # Tags pass through + ( + SearchFilter(SearchKey("tags[browser]"), "=", SearchValue("chrome")), + "tags[browser]:chrome", + ), + ] + for sf, expected in cases: + assert search_filters_to_query_string([sf]) == expected + + def test_has_and_not_has_filters(self): + # has:user.email → parsed as op=!=, value="" + has_filter = SearchFilter(SearchKey("user.email"), "!=", SearchValue("")) + assert search_filters_to_query_string([has_filter]) == "has:user.email" + + # !has:user.email → parsed as op==, value="" + not_has_filter = SearchFilter(SearchKey("user.email"), "=", SearchValue("")) + assert search_filters_to_query_string([not_has_filter]) == "!has:user.email" + + def test_skipped_filters_are_dropped(self): + filters = [ + SearchFilter(SearchKey("event.type"), "=", SearchValue("error")), + SearchFilter(SearchKey("release.stage"), "=", SearchValue("adopted")), + SearchFilter(SearchKey("release.version"), ">", SearchValue("1.0.0")), + SearchFilter(SearchKey("release.package"), "=", SearchValue("com.example")), + SearchFilter(SearchKey("release.build"), "=", SearchValue("123")), + SearchFilter(SearchKey("user.display"), "=", SearchValue("john")), + SearchFilter(SearchKey("team_key_transaction"), "=", SearchValue("1")), + SearchFilter(SearchKey("transaction.status"), "=", SearchValue("ok")), + ] + assert search_filters_to_query_string(filters) == "" + + def test_aggregation_filters_translated(self): + dt = datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc) + cases = [ + ( + SearchFilter(SearchKey("times_seen"), ">", SearchValue("100")), + "count():>100", + ), + ( + SearchFilter(SearchKey("times_seen"), "<=", SearchValue("50")), + "count():<=50", + ), + ( + SearchFilter(SearchKey("last_seen"), ">", SearchValue(dt)), + "last_seen():>2024-01-15T12:00:00+00:00", + ), + ( + SearchFilter(SearchKey("user_count"), ">", SearchValue("5")), + "count_unique(user):>5", + ), + ] + for sf, expected in cases: + assert search_filters_to_query_string([sf]) == expected, ( + f"Failed for {sf.key.name}:{sf.operator}{sf.value.raw_value}" + ) + + def test_error_unhandled_translation(self): + # error.unhandled:1 → looking for unhandled → !error.handled:1 + assert ( + search_filters_to_query_string( + [SearchFilter(SearchKey("error.unhandled"), "=", SearchValue("1"))] + ) + == "!error.handled:1" + ) + # error.unhandled:0 → looking for handled → error.handled:1 + assert ( + search_filters_to_query_string( + [SearchFilter(SearchKey("error.unhandled"), "=", SearchValue("0"))] + ) + == "error.handled:1" + ) + # !error.unhandled:1 → looking for handled → error.handled:1 + assert ( + search_filters_to_query_string( + [SearchFilter(SearchKey("error.unhandled"), "!=", SearchValue("1"))] + ) + == "error.handled:1" + ) + + def test_error_main_thread_key_translated(self): + filters = [SearchFilter(SearchKey("error.main_thread"), "=", SearchValue("1"))] + assert search_filters_to_query_string(filters) == "exception_main_thread:1" + + def test_realistic_mixed_query(self): + filters = [ + SearchFilter(SearchKey("level"), "=", SearchValue("error")), + SearchFilter(SearchKey("error.unhandled"), "=", SearchValue("1")), + SearchFilter(SearchKey("times_seen"), ">", SearchValue("50")), + SearchFilter(SearchKey("platform"), "IN", SearchValue(["python", "javascript"])), + SearchFilter(SearchKey("release.version"), ">", SearchValue("2.0.0")), + SearchFilter(SearchKey("tags[browser]"), "=", SearchValue("chrome")), + ] + result = search_filters_to_query_string(filters) + assert result == ( + "level:error !error.handled:1 count():>50" + " platform:[python, javascript] tags[browser]:chrome" + ) + + +class TestRunEAPGroupSearch(TestCase, SnubaTestCase, OccurrenceTestCase): + def setUp(self) -> None: + super().setUp() + self.now = datetime.now(timezone.utc) + self.start = self.now - timedelta(hours=1) + self.end = self.now + timedelta(hours=1) + + self.group1 = self.create_group(project=self.project) + self.group2 = self.create_group(project=self.project) + + # Store 3 error occurrences for group1, 1 warning for group2 + for _ in range(3): + occ = self.create_eap_occurrence( + group_id=self.group1.id, + level="error", + timestamp=self.now - timedelta(minutes=5), + ) + self.store_eap_items([occ]) + + occ = self.create_eap_occurrence( + group_id=self.group2.id, + level="warning", + timestamp=self.now - timedelta(minutes=10), + ) + self.store_eap_items([occ]) + + def test_last_seen_sort(self) -> None: + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="last_seen", + organization=self.organization, + referrer="test", + ) + group_ids = [gid for gid, _ in result] + assert len(group_ids) == 2 + assert group_ids[0] == self.group1.id + assert group_ids[1] == self.group2.id + + def test_times_seen_sort(self) -> None: + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="times_seen", + organization=self.organization, + referrer="test", + ) + group_ids = [gid for gid, _ in result] + assert len(group_ids) == 2 + assert group_ids[0] == self.group1.id + assert group_ids[1] == self.group2.id + + def test_first_seen_sort(self) -> None: + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="first_seen", + organization=self.organization, + referrer="test", + ) + group_ids = [gid for gid, _ in result] + assert len(group_ids) == 2 + assert group_ids[0] == self.group1.id + assert group_ids[1] == self.group2.id + + def test_user_count_sort(self) -> None: + group3 = self.create_group(project=self.project) + for i in range(3): + occ = self.create_eap_occurrence( + group_id=group3.id, + level="error", + timestamp=self.now - timedelta(minutes=3), + tags={"sentry:user": f"user-{i}@example.com"}, + ) + self.store_eap_items([occ]) + + occ = self.create_eap_occurrence( + group_id=self.group1.id, + level="error", + timestamp=self.now - timedelta(minutes=3), + tags={"sentry:user": "only-user@example.com"}, + ) + self.store_eap_items([occ]) + + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="user_count", + organization=self.organization, + referrer="test", + ) + group_ids = [gid for gid, _ in result] + assert len(group_ids) == 2 + assert group_ids[0] == group3.id + assert group_ids[1] == self.group1.id + + def test_unsupported_sort_returns_empty(self) -> None: + result, total = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="trends", + organization=self.organization, + referrer="test", + ) + assert result == [] + assert total == 0 + + def test_filter_narrows_results(self) -> None: + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="last_seen", + organization=self.organization, + search_filters=[SearchFilter(SearchKey("level"), "=", SearchValue("error"))], + referrer="test", + ) + group_ids = {gid for gid, _ in result} + assert group_ids == {self.group1.id} + + def test_group_id_pre_filter(self) -> None: + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="last_seen", + organization=self.organization, + group_ids=[self.group1.id], + referrer="test", + ) + assert {gid for gid, _ in result} == {self.group1.id} + + def test_environment_filter(self) -> None: + env = self.create_environment(project=self.project, name="production") + occ = self.create_eap_occurrence( + group_id=self.group1.id, + level="error", + environment="production", + timestamp=self.now - timedelta(minutes=2), + ) + self.store_eap_items([occ]) + + occ2 = self.create_eap_occurrence( + group_id=self.group2.id, + level="warning", + environment="staging", + timestamp=self.now - timedelta(minutes=2), + ) + self.store_eap_items([occ2]) + + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=[env.id], + sort_field="last_seen", + organization=self.organization, + referrer="test", + ) + group_ids = {gid for gid, _ in result} + assert self.group1.id in group_ids + assert self.group2.id not in group_ids + + def test_sort_and_filter(self) -> None: + group3 = self.create_group(project=self.project) + for i in range(5): + occ = self.create_eap_occurrence( + group_id=group3.id, + level="error", + timestamp=self.now - timedelta(minutes=1 + i), + ) + self.store_eap_items([occ]) + + result, _ = run_eap_group_search( + start=self.start, + end=self.end, + project_ids=[self.project.id], + environment_ids=None, + sort_field="times_seen", + organization=self.organization, + group_ids=[self.group1.id, group3.id], + search_filters=[SearchFilter(SearchKey("level"), "=", SearchValue("error"))], + referrer="test", + ) + group_ids = [gid for gid, _ in result] + assert len(group_ids) == 2 + assert group_ids[0] == group3.id + assert group_ids[1] == self.group1.id + assert self.group2.id not in group_ids