Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/sentry/search/eap/occurrences/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@
],
),
"count_unique": count_unique_aggregate_definition(default_arg="group_id"),
"first_seen": AggregateDefinition(
internal_function=Function.FUNCTION_MIN,
default_search_type="integer",
infer_search_type_from_arguments=False,
arguments=[
AttributeArgumentDefinition(
attribute_types={
"duration",
"number",
"integer",
"string",
*constants.SIZE_TYPE,
*constants.DURATION_TYPE,
},
default_arg="timestamp",
)
],
),
"last_seen": AggregateDefinition(
internal_function=Function.FUNCTION_MAX,
default_search_type="integer",
Expand Down
287 changes: 287 additions & 0 deletions src/sentry/search/eap/occurrences/search_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import Any

from sentry.api.event_search import SearchFilter
from sentry.models.environment import Environment
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.search.eap.occurrences.query_utils import build_group_id_in_filter
from sentry.search.eap.types import SearchResolverConfig
from sentry.search.events.types import SnubaParams
from sentry.snuba.occurrences_rpc import Occurrences

logger = logging.getLogger(__name__)


# Filters that must be skipped because they have no EAP equivalent.
# These would silently become dynamic tag lookups in the EAP SearchResolver
# and produce incorrect results.
# TODO: these are potentially gaps between existing issue feed search behavior and EAP search behavior. May need to adddress.
SKIP_FILTERS: frozenset[str] = frozenset(
{
# event.type is added internally by _query_params_for_error(), not from user filters.
# EAP occurrences don't use event.type — they're pre-typed.
"event.type",
# Require Postgres Release table lookups (semver matching, stage resolution).
"release.stage",
"release.version",
"release.package",
"release.build",
# Virtual alias that expands to coalesce(user.email, user.username, ...).
# No EAP equivalent.
"user.display",
# Requires team context lookup.
"team_key_transaction",
# Requires Snuba-specific status code translation.
"transaction.status",
}
)

# Filters that need key name translation from legacy Snuba names to EAP attribute names.
# TODO: instead of translating this key, maybe we should just set the public alias for this attribute to "error.main_thread"?
TRANSLATE_KEYS: dict[str, str] = {
"error.main_thread": "exception_main_thread",
}

# Legacy aggregation field names → EAP aggregate function syntax.
# In the legacy path these become HAVING clauses (e.g. times_seen:>100 → HAVING count() > 100).
# The EAP SearchResolver parses function syntax like count():>100 as AggregateFilter objects
# and routes them to the aggregation_filter field on the RPC request.
AGGREGATION_FIELD_TO_EAP_FUNCTION: dict[str, str] = {
"last_seen": "last_seen()",
"times_seen": "count()",
"first_seen": "first_seen()",
"user_count": "count_unique(user)",
}
Comment thread
shashjar marked this conversation as resolved.
Comment thread
shashjar marked this conversation as resolved.


def search_filters_to_query_string(
search_filters: Sequence[SearchFilter],
) -> str:
"""
Convert Snuba-relevant SearchFilter objects to an EAP query string.

Expects filters that have already been stripped of postgres-only fields
(status, assigned_to, bookmarked_by, etc.) by the caller.

Returns a query string like: 'level:error platform:python message:"foo bar"'
compatible with the EAP SearchResolver's parse_search_query().
"""
parts: list[str] = []
for sf in search_filters:
part = _convert_single_filter(sf)
if part is not None:
parts.append(part)
return " ".join(parts)


def _convert_single_filter(sf: SearchFilter) -> str | None:
key = sf.key.name
op = sf.operator
raw_value = sf.value.raw_value

if key in AGGREGATION_FIELD_TO_EAP_FUNCTION:
return _convert_aggregation_filter(sf)

if key in SKIP_FILTERS:
return None

# error.unhandled requires special inversion logic.
# Legacy uses notHandled() Snuba function; EAP has error.handled attribute.
if key == "error.unhandled":
return _convert_error_unhandled(sf)

if key in TRANSLATE_KEYS:
key = TRANSLATE_KEYS[key]

# has / !has filters: empty string value with = or !=
if raw_value == "" and op in ("=", "!="):
if op == "!=":
return f"has:{key}"
else:
return f"!has:{key}"

formatted_value = _format_value(raw_value)

if op == "=":
return f"{key}:{formatted_value}"
elif op == "!=":
return f"!{key}:{formatted_value}"
elif op in (">", ">=", "<", "<="):
return f"{key}:{op}{formatted_value}"
elif op == "IN":
return f"{key}:{formatted_value}"
elif op == "NOT IN":
return f"!{key}:{formatted_value}"

logger.warning(
"eap.search_executor.unknown_operator",
extra={"key": key, "operator": op},
)
return None


def _convert_aggregation_filter(sf: SearchFilter) -> str | None:
eap_function = AGGREGATION_FIELD_TO_EAP_FUNCTION[sf.key.name]
formatted_value = _format_value(sf.value.raw_value)

if sf.operator in (">", ">=", "<", "<="):
return f"{eap_function}:{sf.operator}{formatted_value}"
elif sf.operator == "=":
return f"{eap_function}:{formatted_value}"
elif sf.operator == "!=":
return f"!{eap_function}:{formatted_value}"

return None


def _convert_error_unhandled(sf: SearchFilter) -> str | None:
raw_value = sf.value.raw_value
op = sf.operator

is_looking_for_unhandled = (op == "=" and raw_value in ("1", 1, True, "true")) or (
op == "!=" and raw_value in ("0", 0, False, "false")
)

if is_looking_for_unhandled:
return "!error.handled:1"
else:
return "error.handled:1"


def _format_value(
raw_value: str | int | float | datetime | Sequence[str] | Sequence[float],
) -> str:
if isinstance(raw_value, (list, tuple)):
parts = ", ".join(_format_single_value(v) for v in raw_value)
return f"[{parts}]"
if isinstance(raw_value, datetime):
return raw_value.isoformat()
if isinstance(raw_value, (int, float)):
return str(raw_value)
return _format_string_value(str(raw_value))


def _format_single_value(value: str | int | float | datetime) -> str:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, (int, float)):
return str(value)
return _format_string_value(str(value))


def _format_string_value(s: str) -> str:
# Quote strings containing spaces or special characters.
if " " in s or '"' in s or "," in s or "(" in s or ")" in s:
escaped = s.replace("\\", "\\\\").replace('"', '\\"')
Comment on lines +177 to +178
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _format_string_value function doesn't quote strings containing square brackets [ or ], which are special characters in EAP query syntax and may cause parsing errors.
Severity: MEDIUM

Suggested Fix

Update the conditional check in the _format_string_value function to include square brackets. Add checks for "[" in s and "]" in s to the if statement on line 177 to ensure strings containing these characters are properly quoted.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/sentry/search/eap/occurrences/search_executor.py#L177-L178

Potential issue: The `_format_string_value` function checks for special characters to
determine if a string value needs quoting, but it does not include square brackets (`[`
and `]`). Since square brackets are used for list syntax in EAP queries, a field value
containing them (e.g., `config[production]`) will not be quoted. This can cause the EAP
parser to misinterpret the literal string value as list syntax, potentially leading to
incorrect query behavior or parsing errors. The function already handles other
syntactically significant characters like parentheses and commas, suggesting the
omission of brackets is an oversight.

return f'"{escaped}"'

return s


# Maps legacy sort_field names (from PostgresSnubaQueryExecutor.sort_strategies values)
# to (selected_columns, orderby) for EAP queries.
#
# Reference — legacy sort_strategies in executors.py:
# "date" → "last_seen" → max(timestamp) * 1000
# "freq" → "times_seen" → count()
# "new" → "first_seen" → min(coalesce(group_first_seen, timestamp)) * 1000
# "user" → "user_count" → uniq(tags[sentry:user])
# "trends" → "trends" → complex ClickHouse expression (not supported)
# "recommended" → "recommended" → complex ClickHouse expression (not supported)
# "inbox" → "" → Postgres only (not supported)
EAP_SORT_STRATEGIES: dict[str, tuple[list[str], list[str]]] = {
"last_seen": (["group_id", "last_seen()"], ["-last_seen()"]),
"times_seen": (["group_id", "count()"], ["-count()"]),
"first_seen": (["group_id", "first_seen()"], ["-first_seen()"]),
Comment thread
shashjar marked this conversation as resolved.
"user_count": (["group_id", "count_unique(user)"], ["-count_unique(user)"]),
}


def run_eap_group_search(
start: datetime,
end: datetime,
project_ids: Sequence[int],
environment_ids: Sequence[int] | None,
sort_field: str,
organization: Organization,
group_ids: Sequence[int] | None = None,
limit: int | None = None,
offset: int = 0,
search_filters: Sequence[SearchFilter] | None = None,
referrer: str = "",
) -> tuple[list[tuple[int, Any]], int]:
"""
EAP equivalent of PostgresSnubaQueryExecutor.snuba_search().

Returns a tuple of:
* a list of (group_id, sort_score) tuples,
* total count (0 during double-reading; legacy provides the real total).
"""
if sort_field not in EAP_SORT_STRATEGIES:
return ([], 0)

selected_columns, orderby = EAP_SORT_STRATEGIES[sort_field]
score_column = selected_columns[1] # e.g. "last_seen()" or "count()"

projects = list(Project.objects.filter(id__in=project_ids))
if not projects:
return ([], 0)

environments: list[Environment] = []
if environment_ids:
environments = list(
Environment.objects.filter(organization_id=organization.id, id__in=environment_ids)
)

snuba_params = SnubaParams(
start=start,
end=end,
organization=organization,
projects=projects,
environments=environments,
)

query_string = search_filters_to_query_string(search_filters or [])

extra_conditions = None
if group_ids:
extra_conditions = build_group_id_in_filter(group_ids)

try:
result = Occurrences.run_table_query(
params=snuba_params,
query_string=query_string,
selected_columns=selected_columns,
orderby=orderby,
offset=offset,
limit=limit or 100,
referrer=referrer,
config=SearchResolverConfig(),
extra_conditions=extra_conditions,
)
except Exception:
logger.exception(
"eap.search_executor.run_table_query_failed",
extra={
"organization_id": organization.id,
"project_ids": project_ids,
"sort_field": sort_field,
"referrer": referrer,
},
)
return ([], 0)

tuples: list[tuple[int, Any]] = []
for row in result.get("data", []):
group_id = row.get("group_id")
score = row.get(score_column)
if group_id is not None:
tuples.append((int(group_id), score))

# TODO: the EAP RPC TraceItemTableResponse does not include a total count
# (unlike Snuba's totals=True). During double-reading the legacy result
# provides the real total, so we return 0 here.
return (tuples, 0)
Loading
Loading