Skip to content

Commit 7a1cbce

Browse files
authored
feat(occurrences on eap): Implement EAP query for eventstore get_events (#112423)
Implements an EAP query (double read) for the eventstore's `__get_events` function (used by both `get_events` & `get_unfetched_events`) in `src/sentry/services/eventstore/snuba/backend.py`. Rather than write logic to convert an `eventstore.Filter` value to the equivalent format needed for an EAP query, I'm adding logic here to create `eap_conditions` at the individual callsites.
1 parent e5b1933 commit 7a1cbce

File tree

15 files changed

+480
-16
lines changed

15 files changed

+480
-16
lines changed

src/sentry/api/serializers/models/userreport.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sentry.models.group import Group
99
from sentry.models.project import Project
1010
from sentry.models.userreport import UserReport
11+
from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter
1112
from sentry.services import eventstore
1213
from sentry.services.eventstore.models import Event
1314
from sentry.snuba.dataset import Dataset
@@ -46,12 +47,14 @@ def get_attrs(self, item_list, user, **kwargs):
4647
project = Project.objects.get(id=item_list[0].project_id)
4748
retention = quotas.backend.get_event_retention(organization=project.organization)
4849

50+
event_ids = [item.event_id for item in item_list]
4951
events = eventstore.backend.get_events(
5052
filter=eventstore.Filter(
51-
event_ids=[item.event_id for item in item_list],
53+
event_ids=event_ids,
5254
project_ids=[project.id],
5355
start=timezone.now() - timedelta(days=retention) if retention else None,
5456
),
57+
eap_conditions=build_event_id_in_filter(event_ids),
5558
referrer="UserReportSerializer.get_attrs",
5659
dataset=Dataset.Events,
5760
tenant_ids={"organization_id": project.organization_id},

src/sentry/deletions/tasks/nodestore.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any
55

66
import sentry_sdk
7+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter
78
from snuba_sdk import DeleteQuery, Request
89
from taskbroker_client.retry import Retry
910

@@ -13,6 +14,11 @@
1314
from sentry.exceptions import DeleteAborted
1415
from sentry.models.eventattachment import EventAttachment
1516
from sentry.models.userreport import UserReport
17+
from sentry.search.eap.occurrences.query_utils import (
18+
build_group_id_in_filter,
19+
build_keyset_pagination_filter,
20+
)
21+
from sentry.search.eap.rpc_utils import and_trace_item_filters
1622
from sentry.services import eventstore
1723
from sentry.services.eventstore.models import Event
1824
from sentry.silo.base import SiloMode
@@ -155,20 +161,29 @@ def fetch_events_from_eventstore(
155161
) -> list[Event]:
156162
logger.info("Fetching %s events for deletion.", limit)
157163
conditions = []
164+
eap_conditions: TraceItemFilter | None = build_group_id_in_filter(group_ids)
158165
if last_event_id and last_event_timestamp:
159166
conditions.extend(
160167
[
161168
["timestamp", "<=", last_event_timestamp],
162169
[["timestamp", "<", last_event_timestamp], ["event_id", "<", last_event_id]],
163170
]
164171
)
172+
eap_conditions = and_trace_item_filters(
173+
eap_conditions,
174+
build_keyset_pagination_filter(
175+
timestamp_value=last_event_timestamp,
176+
event_id=last_event_id,
177+
),
178+
)
165179

166180
events = eventstore.backend.get_unfetched_events(
167181
filter=eventstore.Filter(
168182
conditions=conditions,
169183
project_ids=[project_id],
170184
group_ids=group_ids,
171185
),
186+
eap_conditions=eap_conditions,
172187
limit=limit,
173188
referrer=referrer,
174189
orderby=["-timestamp", "-event_id"],

src/sentry/feedback/tasks/update_user_reports.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from sentry.feedback.usecases.ingest.shim_to_feedback import shim_to_feedback
1010
from sentry.models.project import Project
1111
from sentry.models.userreport import UserReport
12+
from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter
1213
from sentry.services import eventstore
1314
from sentry.silo.base import SiloMode
1415
from sentry.snuba.referrer import Referrer
@@ -90,7 +91,9 @@ def update_user_reports(
9091
)
9192
try:
9293
events_chunk = eventstore.backend.get_events(
93-
filter=snuba_filter, referrer=Referrer.TASKS_UPDATE_USER_REPORTS.value
94+
filter=snuba_filter,
95+
eap_conditions=build_event_id_in_filter(event_id_chunk),
96+
referrer=Referrer.TASKS_UPDATE_USER_REPORTS.value,
9497
)
9598
events.extend(events_chunk)
9699
except Exception:

src/sentry/issues/endpoints/organization_eventid.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from drf_spectacular.utils import extend_schema
44
from rest_framework.request import Request
55
from rest_framework.response import Response
6+
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
7+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter
68

79
from sentry.api.api_owners import ApiOwner
810
from sentry.api.api_publish_status import ApiPublishStatus
@@ -18,6 +20,8 @@
1820
from sentry.models.organization import Organization
1921
from sentry.models.project import Project
2022
from sentry.ratelimits.config import RateLimitConfig
23+
from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter
24+
from sentry.search.eap.rpc_utils import and_trace_item_filters
2125
from sentry.services import eventstore
2226
from sentry.types.ratelimit import RateLimit, RateLimitCategory
2327
from sentry.utils.validators import INVALID_ID_DETAILS, is_event_id
@@ -76,8 +80,21 @@ def get(self, request: Request, organization: Organization, event_id: str) -> Re
7680
project_ids=list(project_slugs_by_id.keys()),
7781
event_ids=[event_id],
7882
)
83+
eap_conditions = and_trace_item_filters(
84+
build_event_id_in_filter([event_id]),
85+
TraceItemFilter(
86+
comparison_filter=ComparisonFilter(
87+
key=AttributeKey(name="type", type=AttributeKey.TYPE_STRING),
88+
op=ComparisonFilter.OP_NOT_EQUALS,
89+
value=AttributeValue(val_str="transaction"),
90+
)
91+
),
92+
)
7993
event = eventstore.backend.get_events(
80-
filter=snuba_filter, limit=1, tenant_ids={"organization_id": organization.id}
94+
filter=snuba_filter,
95+
eap_conditions=eap_conditions,
96+
limit=1,
97+
tenant_ids={"organization_id": organization.id},
8198
)[0]
8299
except IndexError:
83100
raise ResourceDoesNotExist()

src/sentry/issues/endpoints/project_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from rest_framework.exceptions import ParseError
55
from rest_framework.request import Request
66
from rest_framework.response import Response
7+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter
78

89
from sentry.api.api_owners import ApiOwner
910
from sentry.api.api_publish_status import ApiPublishStatus
@@ -95,6 +96,7 @@ def get(self, request: Request, project: Project) -> Response:
9596
data_fn = partial(
9697
eventstore.backend.get_events,
9798
filter=event_filter,
99+
eap_conditions=TraceItemFilter(), # TODO: not currently taking the query into account
98100
referrer="api.project-events",
99101
tenant_ids={"organization_id": project.organization_id},
100102
)

src/sentry/models/group.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from django.utils import timezone
1919
from django.utils.http import urlencode
2020
from django.utils.translation import gettext_lazy as _
21+
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey
22+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter
2123
from snuba_sdk import Column, Condition, Op
2224

2325
from sentry import eventstore, eventtypes, options, tagstore
@@ -44,6 +46,8 @@
4446
from sentry.models.commit import Commit
4547
from sentry.models.grouphistory import record_group_history, record_group_history_from_activity_type
4648
from sentry.models.organization import Organization
49+
from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter
50+
from sentry.search.eap.rpc_utils import and_trace_item_filters
4751
from sentry.services.eventstore.models import GroupEvent
4852
from sentry.snuba.dataset import Dataset
4953
from sentry.snuba.referrer import Referrer
@@ -435,6 +439,14 @@ def filter_by_event_id(self, project_ids, event_id, tenant_ids=None):
435439
project_ids=project_ids,
436440
conditions=[["group_id", "IS NOT NULL", None]],
437441
),
442+
eap_conditions=and_trace_item_filters(
443+
build_event_id_in_filter([event_id]),
444+
TraceItemFilter(
445+
exists_filter=ExistsFilter(
446+
key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT)
447+
)
448+
),
449+
),
438450
limit=max(len(project_ids), 100),
439451
referrer="Group.filter_by_event_id",
440452
tenant_ids=tenant_ids,

src/sentry/search/eap/occurrences/query_utils.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,18 @@
22
from datetime import datetime
33
from typing import Any
44

5+
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
6+
AttributeKey,
7+
AttributeValue,
8+
IntArray,
9+
StrArray,
10+
)
11+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter
12+
513
from sentry.models.environment import Environment
614
from sentry.models.organization import Organization
715
from sentry.models.project import Project
16+
from sentry.search.eap.rpc_utils import and_trace_item_filters, or_trace_item_filters
817
from sentry.search.events.types import SnubaParams
918

1019

@@ -74,3 +83,56 @@ def _to_count_map(rows: Sequence[Mapping[str, Any]]) -> dict[Hashable, int]:
7483
return False
7584

7685
return all(exp_count <= control_map[key] for key, exp_count in experimental_map.items())
86+
87+
88+
def build_group_id_in_filter(group_ids: Sequence[int]) -> TraceItemFilter:
89+
return TraceItemFilter(
90+
comparison_filter=ComparisonFilter(
91+
key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT),
92+
op=ComparisonFilter.OP_IN,
93+
value=AttributeValue(val_int_array=IntArray(values=list(group_ids))),
94+
)
95+
)
96+
97+
98+
def build_event_id_in_filter(event_ids: Sequence[str]) -> TraceItemFilter:
99+
return TraceItemFilter(
100+
comparison_filter=ComparisonFilter(
101+
key=AttributeKey(name="sentry.item_id", type=AttributeKey.TYPE_STRING),
102+
op=ComparisonFilter.OP_IN,
103+
value=AttributeValue(val_str_array=StrArray(values=list(event_ids))),
104+
)
105+
)
106+
107+
108+
def build_keyset_pagination_filter(
109+
timestamp_value: str,
110+
event_id: str,
111+
) -> TraceItemFilter | None:
112+
ts_epoch = datetime.fromisoformat(timestamp_value).timestamp()
113+
timestamp_key = AttributeKey(name="sentry.timestamp", type=AttributeKey.TYPE_DOUBLE)
114+
event_id_key = AttributeKey(name="sentry.item_id", type=AttributeKey.TYPE_STRING)
115+
116+
ts_lte = TraceItemFilter(
117+
comparison_filter=ComparisonFilter(
118+
key=timestamp_key,
119+
op=ComparisonFilter.OP_LESS_THAN_OR_EQUALS,
120+
value=AttributeValue(val_double=ts_epoch),
121+
)
122+
)
123+
ts_lt = TraceItemFilter(
124+
comparison_filter=ComparisonFilter(
125+
key=timestamp_key,
126+
op=ComparisonFilter.OP_LESS_THAN,
127+
value=AttributeValue(val_double=ts_epoch),
128+
)
129+
)
130+
eid_lt = TraceItemFilter(
131+
comparison_filter=ComparisonFilter(
132+
key=event_id_key,
133+
op=ComparisonFilter.OP_LESS_THAN,
134+
value=AttributeValue(val_str=event_id),
135+
)
136+
)
137+
138+
return and_trace_item_filters(ts_lte, or_trace_item_filters(ts_lt, eid_lt))

src/sentry/seer/explorer/tools.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from sentry.replays.post_process import process_raw_response
3333
from sentry.replays.query import query_replay_id_by_prefix, query_replay_instance
3434
from sentry.search.eap.constants import BOOLEAN, DOUBLE, INT, STRING
35+
from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter
3536
from sentry.search.eap.resolver import SearchResolver
3637
from sentry.search.eap.types import SearchResolverConfig
3738
from sentry.search.events.constants import ISSUE_ID_ALIAS
@@ -1255,6 +1256,7 @@ def get_event_details(
12551256
organization_id=organization_id,
12561257
project_ids=project_ids,
12571258
),
1259+
eap_conditions=build_event_id_in_filter([event_id]),
12581260
limit=1,
12591261
tenant_ids={"organization_id": organization_id},
12601262
dataset=dataset,
@@ -1355,6 +1357,7 @@ def get_issue_and_event_details_v2(
13551357
organization_id=organization_id,
13561358
project_ids=project_ids,
13571359
),
1360+
eap_conditions=build_event_id_in_filter([event_id]),
13581361
limit=1,
13591362
tenant_ids={"organization_id": organization_id},
13601363
dataset=dataset,

src/sentry/services/eventstore/base.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
from typing import Any, Literal, Self, overload
88

99
import sentry_sdk
10+
from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter
1011
from snuba_sdk import Condition
1112

1213
from sentry import nodestore
1314
from sentry.services.eventstore.models import Event, GroupEvent
1415
from sentry.snuba.dataset import Dataset
1516
from sentry.snuba.events import Columns
17+
from sentry.snuba.referrer import Referrer
1618
from sentry.utils.services import Service
1719

1820

@@ -166,10 +168,11 @@ class EventStorage(Service):
166168
def get_events(
167169
self,
168170
filter: Filter,
171+
eap_conditions: TraceItemFilter | None = None,
169172
orderby: Sequence[str] | None = None,
170173
limit: int = 100,
171174
offset: int = 0,
172-
referrer: str = "eventstore.get_events",
175+
referrer: str = Referrer.EVENTSTORE_GET_EVENTS.value,
173176
dataset: Dataset = Dataset.Events,
174177
tenant_ids: Mapping[str, Any] | None = None,
175178
) -> list[Event]:
@@ -180,11 +183,12 @@ def get_events(
180183
transaction events. Returns an empty list if no events match the filter.
181184
182185
Arguments:
183-
snuba_filter (Filter): Filter
186+
filter (Filter): Snuba query filter
187+
eap_conditions (TraceItemFilter | None): EAP query conditions
184188
orderby (Sequence[str]): List of fields to order by - default ['-time', '-event_id']
185189
limit (int): Query limit - default 100
186190
offset (int): Query offset - default 0
187-
referrer (string): Referrer - default "eventstore.get_events"
191+
referrer (string): Referrer
188192
"""
189193
raise NotImplementedError
190194

@@ -208,28 +212,30 @@ def get_events_snql(
208212
def get_unfetched_events(
209213
self,
210214
filter: Filter,
215+
eap_conditions: TraceItemFilter | None = None,
211216
orderby: Sequence[str] | None = None,
212217
limit: int = 100,
213218
offset: int = 0,
214-
referrer: str = "eventstore.get_unfetched_events",
219+
referrer: str = Referrer.EVENTSTORE_GET_UNFETCHED_EVENTS.value,
215220
dataset: Dataset = Dataset.Events,
216221
tenant_ids: Mapping[str, Any] | None = None,
217222
) -> list[Event]:
218223
"""
219-
Same as get_events but returns events without their node datas loaded.
220-
Only the event ID, projectID, groupID and timestamp field will be present without
221-
an additional fetch to nodestore.
224+
Same as get_events but returns events without their node data loaded.
225+
Only the event ID, project ID, group ID, and timestamp fields will be present
226+
without an additional fetch to nodestore.
222227
223228
Used for fetching large volumes of events that do not need data loaded
224229
from nodestore. Currently this is just used for event data deletions where
225230
we just need the event IDs in order to process the deletions.
226231
227232
Arguments:
228-
snuba_filter (Filter): Filter
233+
filter (Filter): Snuba query filter
234+
eap_conditions (TraceItemFilter | None): EAP query conditions
229235
orderby (Sequence[str]): List of fields to order by - default ['-time', '-event_id']
230236
limit (int): Query limit - default 100
231237
offset (int): Query offset - default 0
232-
referrer (string): Referrer - default "eventstore.get_unfetched_events"
238+
referrer (string): Referrer
233239
"""
234240
raise NotImplementedError
235241

0 commit comments

Comments
 (0)