|
1 | 1 | import logging |
| 2 | +from collections.abc import Sequence |
2 | 3 | from datetime import UTC, datetime, timedelta |
3 | 4 |
|
| 5 | +from snuba_sdk import Column, Condition, Direction, Entity, Limit, Op, OrderBy, Query, Request |
| 6 | + |
4 | 7 | from sentry import features, options |
5 | 8 | from sentry.api.serializers import EventSerializer, serialize |
6 | 9 | from sentry.eventstore import backend as eventstore |
|
12 | 15 | SeerViewerContext, |
13 | 16 | make_lightweight_rca_cluster_request, |
14 | 17 | ) |
| 18 | +from sentry.snuba.dataset import Dataset |
15 | 19 | from sentry.tasks.base import instrumented_task |
16 | 20 | from sentry.taskworker.namespaces import seer_tasks |
17 | 21 | from sentry.types.group import GroupSubStatus |
18 | 22 | from sentry.utils import metrics |
| 23 | +from sentry.utils.snuba import bulk_snuba_queries |
19 | 24 |
|
20 | 25 | logger = logging.getLogger(__name__) |
21 | 26 |
|
22 | 27 | BACKFILL_LAST_SEEN_DAYS = 90 |
23 | 28 | BATCH_SIZE = 50 |
24 | | -INTER_BATCH_DELAY_S = 5 |
| 29 | +INTER_BATCH_DELAY_S = 1 |
25 | 30 |
|
26 | 31 |
|
27 | 32 | @instrumented_task( |
@@ -102,20 +107,7 @@ def backfill_supergroups_lightweight_for_org( |
102 | 107 | return |
103 | 108 |
|
104 | 109 | # Phase 1: Batch fetch event data |
105 | | - group_event_pairs: list[tuple[Group, dict]] = [] |
106 | | - for group in groups: |
107 | | - event = group.get_latest_event() |
108 | | - if not event: |
109 | | - continue |
110 | | - |
111 | | - ready_event = eventstore.get_event_by_id( |
112 | | - group.project_id, event.event_id, group_id=group.id |
113 | | - ) |
114 | | - if not ready_event: |
115 | | - continue |
116 | | - |
117 | | - serialized_event = serialize(ready_event, None, EventSerializer()) |
118 | | - group_event_pairs.append((group, serialized_event)) |
| 110 | + group_event_pairs = _batch_fetch_events(groups, organization_id) |
119 | 111 |
|
120 | 112 | # Phase 2: Send to Seer (per-group for now, bulk-ready) |
121 | 113 | failure_count = 0 |
@@ -184,3 +176,59 @@ def backfill_supergroups_lightweight_for_org( |
184 | 176 | "supergroups_backfill_lightweight.org_completed", |
185 | 177 | extra={"organization_id": organization_id}, |
186 | 178 | ) |
| 179 | + |
| 180 | + |
| 181 | +def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[tuple[Group, dict]]: |
| 182 | + """ |
| 183 | + Fetch the latest event for each group using batched Snuba queries, |
| 184 | + then serialize each event for sending to Seer. |
| 185 | + """ |
| 186 | + now = datetime.now(UTC) |
| 187 | + timestamp_start = now - timedelta(days=BACKFILL_LAST_SEEN_DAYS) |
| 188 | + |
| 189 | + # Build one Snuba request per group to find the latest event_id |
| 190 | + snuba_requests = [] |
| 191 | + for group in groups: |
| 192 | + # Use a tight window around the group's last_seen to minimize scan range, |
| 193 | + # falling back to the full backfill window if last_seen is unavailable |
| 194 | + group_start = group.last_seen - timedelta(hours=1) if group.last_seen else timestamp_start |
| 195 | + snuba_requests.append( |
| 196 | + Request( |
| 197 | + dataset=Dataset.Events.value, |
| 198 | + app_id="supergroups_backfill", |
| 199 | + query=Query( |
| 200 | + match=Entity(Dataset.Events.value), |
| 201 | + select=[Column("event_id"), Column("group_id"), Column("project_id")], |
| 202 | + where=[ |
| 203 | + Condition(Column("project_id"), Op.EQ, group.project_id), |
| 204 | + Condition(Column("group_id"), Op.EQ, group.id), |
| 205 | + Condition(Column("timestamp"), Op.GTE, group_start), |
| 206 | + Condition(Column("timestamp"), Op.LT, now + timedelta(minutes=5)), |
| 207 | + ], |
| 208 | + orderby=[OrderBy(Column("timestamp"), Direction.DESC)], |
| 209 | + limit=Limit(1), |
| 210 | + ), |
| 211 | + tenant_ids={"organization_id": organization_id}, |
| 212 | + ) |
| 213 | + ) |
| 214 | + |
| 215 | + results = bulk_snuba_queries( |
| 216 | + snuba_requests, referrer="supergroups_backfill_lightweight.get_latest_events" |
| 217 | + ) |
| 218 | + |
| 219 | + # Fetch full events from nodestore and serialize |
| 220 | + group_event_pairs: list[tuple[Group, dict]] = [] |
| 221 | + for group, result in zip(groups, results): |
| 222 | + rows = result.get("data", []) |
| 223 | + if not rows: |
| 224 | + continue |
| 225 | + |
| 226 | + event_id = rows[0]["event_id"] |
| 227 | + ready_event = eventstore.get_event_by_id(group.project_id, event_id, group_id=group.id) |
| 228 | + if not ready_event: |
| 229 | + continue |
| 230 | + |
| 231 | + serialized_event = serialize(ready_event, None, EventSerializer()) |
| 232 | + group_event_pairs.append((group, serialized_event)) |
| 233 | + |
| 234 | + return group_event_pairs |
0 commit comments