Skip to content

Commit 3e509b5

Browse files
yuvmenclaude
andcommitted
feat(seer): Add lightweight supergroups backfill task
Add an org-scoped Celery task that iterates all error groups in an organization (seen in last 90 days) and sends each to Seer's lightweight RCA clustering endpoint for supergroup backfilling. The task processes groups in batches of 50 with cursor-based pagination and self-chains until all groups are processed. Designed to be triggered from a getsentry job. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7b309e4 commit 3e509b5

File tree

3 files changed

+373
-0
lines changed

3 files changed

+373
-0
lines changed

src/sentry/options/defaults.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,13 @@
13851385
flags=FLAG_AUTOMATOR_MODIFIABLE,
13861386
)
13871387

1388+
register(
1389+
"seer.supergroups_backfill_lightweight.killswitch",
1390+
type=Bool,
1391+
default=False,
1392+
flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
1393+
)
1394+
13881395
# ## sentry.killswitches
13891396
#
13901397
# The following options are documented in sentry.killswitches in more detail
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import logging
2+
from datetime import UTC, datetime, timedelta
3+
4+
from sentry import features, options
5+
from sentry.api.serializers import EventSerializer, serialize
6+
from sentry.eventstore import backend as eventstore
7+
from sentry.models.group import DEFAULT_TYPE_ID, Group
8+
from sentry.models.organization import Organization
9+
from sentry.models.project import Project
10+
from sentry.seer.signed_seer_api import (
11+
LightweightRCAClusterRequest,
12+
SeerViewerContext,
13+
make_lightweight_rca_cluster_request,
14+
)
15+
from sentry.tasks.base import instrumented_task
16+
from sentry.taskworker.namespaces import seer_tasks
17+
from sentry.types.group import GroupSubStatus
18+
from sentry.utils import metrics
19+
20+
logger = logging.getLogger(__name__)
21+
22+
BACKFILL_LAST_SEEN_DAYS = 90
23+
BATCH_SIZE = 50
24+
INTER_BATCH_DELAY_S = 5
25+
26+
27+
@instrumented_task(
28+
name="sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org",
29+
namespace=seer_tasks,
30+
processing_deadline_duration=15 * 60,
31+
)
32+
def backfill_supergroups_lightweight_for_org(
33+
organization_id: int,
34+
last_project_id: int = 0,
35+
last_group_id: int = 0,
36+
**kwargs,
37+
) -> None:
38+
if options.get("seer.supergroups_backfill_lightweight.killswitch"):
39+
logger.info("supergroups_backfill_lightweight.killswitch_enabled")
40+
return
41+
42+
try:
43+
organization = Organization.objects.get(id=organization_id)
44+
except Organization.DoesNotExist:
45+
return
46+
47+
if not features.has("organizations:supergroups-lightweight-rca-clustering", organization):
48+
logger.info(
49+
"supergroups_backfill_lightweight.feature_not_enabled",
50+
extra={"organization_id": organization_id},
51+
)
52+
return
53+
54+
project_ids = list(
55+
Project.objects.filter(
56+
organization_id=organization_id,
57+
id__gte=last_project_id,
58+
)
59+
.order_by("id")
60+
.values_list("id", flat=True)
61+
)
62+
63+
if not project_ids:
64+
logger.info(
65+
"supergroups_backfill_lightweight.org_completed",
66+
extra={"organization_id": organization_id},
67+
)
68+
return
69+
70+
cutoff = datetime.now(UTC) - timedelta(days=BACKFILL_LAST_SEEN_DAYS)
71+
72+
group_filter = Group.objects.filter(
73+
project_id__in=project_ids,
74+
type=DEFAULT_TYPE_ID,
75+
last_seen__gte=cutoff,
76+
substatus__in=[
77+
GroupSubStatus.ONGOING,
78+
GroupSubStatus.NEW,
79+
GroupSubStatus.ESCALATING,
80+
GroupSubStatus.REGRESSED,
81+
],
82+
)
83+
84+
if last_group_id > 0:
85+
group_filter = group_filter.filter(
86+
project_id=last_project_id, id__gt=last_group_id
87+
) | group_filter.filter(project_id__gt=last_project_id)
88+
else:
89+
group_filter = group_filter.filter(project_id__gte=last_project_id)
90+
91+
groups = list(
92+
group_filter.select_related("project", "project__organization").order_by(
93+
"project_id", "id"
94+
)[:BATCH_SIZE]
95+
)
96+
97+
if not groups:
98+
logger.info(
99+
"supergroups_backfill_lightweight.org_completed",
100+
extra={"organization_id": organization_id},
101+
)
102+
return
103+
104+
# Phase 1: Batch fetch event data
105+
group_event_pairs: list[tuple[Group, dict]] = []
106+
for group in groups:
107+
event = group.get_latest_event()
108+
if not event:
109+
continue
110+
111+
ready_event = eventstore.get_event_by_id(
112+
group.project_id, event.event_id, group_id=group.id
113+
)
114+
if not ready_event:
115+
continue
116+
117+
serialized_event = serialize(ready_event, None, EventSerializer())
118+
group_event_pairs.append((group, serialized_event))
119+
120+
# Phase 2: Send to Seer (per-group for now, bulk-ready)
121+
failure_count = 0
122+
success_count = 0
123+
viewer_context = SeerViewerContext(organization_id=organization_id)
124+
125+
for group, serialized_event in group_event_pairs:
126+
try:
127+
body = LightweightRCAClusterRequest(
128+
group_id=group.id,
129+
issue={
130+
"id": group.id,
131+
"title": group.title,
132+
"short_id": group.qualified_short_id,
133+
"events": [serialized_event],
134+
},
135+
organization_slug=organization.slug,
136+
organization_id=organization_id,
137+
project_id=group.project_id,
138+
)
139+
response = make_lightweight_rca_cluster_request(
140+
body, timeout=30, viewer_context=viewer_context
141+
)
142+
if response.status >= 400:
143+
logger.warning(
144+
"supergroups_backfill_lightweight.seer_error",
145+
extra={
146+
"group_id": group.id,
147+
"project_id": group.project_id,
148+
"status": response.status,
149+
},
150+
)
151+
failure_count += 1
152+
else:
153+
success_count += 1
154+
except Exception:
155+
logger.exception(
156+
"supergroups_backfill_lightweight.group_failed",
157+
extra={"group_id": group.id, "project_id": group.project_id},
158+
)
159+
failure_count += 1
160+
161+
metrics.incr(
162+
"seer.supergroups_backfill_lightweight.groups_processed",
163+
amount=success_count,
164+
)
165+
metrics.incr(
166+
"seer.supergroups_backfill_lightweight.groups_failed",
167+
amount=failure_count,
168+
)
169+
170+
# Self-chain if there are more groups to process
171+
if len(groups) == BATCH_SIZE:
172+
last_group = groups[-1]
173+
backfill_supergroups_lightweight_for_org.apply_async(
174+
args=[organization_id],
175+
kwargs={
176+
"last_project_id": last_group.project_id,
177+
"last_group_id": last_group.id,
178+
},
179+
countdown=INTER_BATCH_DELAY_S,
180+
headers={"sentry-propagate-traces": False},
181+
)
182+
else:
183+
logger.info(
184+
"supergroups_backfill_lightweight.org_completed",
185+
extra={"organization_id": organization_id},
186+
)
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
from datetime import UTC, datetime, timedelta
2+
from unittest.mock import MagicMock, patch
3+
4+
from sentry.models.group import DEFAULT_TYPE_ID
5+
from sentry.tasks.seer.backfill_supergroups_lightweight import (
6+
BATCH_SIZE,
7+
backfill_supergroups_lightweight_for_org,
8+
)
9+
from sentry.testutils.cases import TestCase
10+
from sentry.testutils.helpers.features import with_feature
11+
from sentry.types.group import GroupSubStatus
12+
13+
14+
class BackfillSupergroupsLightweightForOrgTest(TestCase):
15+
def setUp(self):
16+
super().setUp()
17+
self.event = self.store_event(
18+
data={"message": "test error", "level": "error"},
19+
project_id=self.project.id,
20+
)
21+
self.group = self.event.group
22+
self.group.substatus = GroupSubStatus.NEW
23+
self.group.save(update_fields=["substatus"])
24+
25+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
26+
@patch(
27+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
28+
)
29+
def test_processes_groups_and_sends_to_seer(self, mock_request):
30+
mock_request.return_value = MagicMock(status=200)
31+
32+
backfill_supergroups_lightweight_for_org(self.organization.id)
33+
34+
mock_request.assert_called_once()
35+
body = mock_request.call_args.args[0]
36+
assert body["group_id"] == self.group.id
37+
assert body["project_id"] == self.project.id
38+
assert body["organization_id"] == self.organization.id
39+
assert body["issue"]["id"] == self.group.id
40+
assert len(body["issue"]["events"]) == 1
41+
42+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
43+
@patch(
44+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
45+
)
46+
def test_processes_groups_across_projects(self, mock_request):
47+
mock_request.return_value = MagicMock(status=200)
48+
49+
project2 = self.create_project(organization=self.organization)
50+
event2 = self.store_event(
51+
data={"message": "error in project2", "level": "error"},
52+
project_id=project2.id,
53+
)
54+
event2.group.substatus = GroupSubStatus.NEW
55+
event2.group.save(update_fields=["substatus"])
56+
57+
backfill_supergroups_lightweight_for_org(self.organization.id)
58+
59+
assert mock_request.call_count == 2
60+
project_ids = {call.args[0]["project_id"] for call in mock_request.call_args_list}
61+
assert project_ids == {self.project.id, project2.id}
62+
63+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
64+
@patch(
65+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
66+
)
67+
def test_self_chains_when_more_groups_exist(self, mock_request):
68+
mock_request.return_value = MagicMock(status=200)
69+
70+
# Create enough groups to fill a batch
71+
for i in range(BATCH_SIZE):
72+
evt = self.store_event(
73+
data={
74+
"message": f"error {i}",
75+
"level": "error",
76+
"fingerprint": [f"group-{i}"],
77+
},
78+
project_id=self.project.id,
79+
)
80+
evt.group.substatus = GroupSubStatus.NEW
81+
evt.group.save(update_fields=["substatus"])
82+
83+
with patch(
84+
"sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async"
85+
) as mock_chain:
86+
backfill_supergroups_lightweight_for_org(self.organization.id)
87+
88+
mock_chain.assert_called_once()
89+
call_kwargs = mock_chain.call_args.kwargs["kwargs"]
90+
assert call_kwargs["last_project_id"] == self.project.id
91+
assert call_kwargs["last_group_id"] > 0
92+
93+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
94+
@patch(
95+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
96+
)
97+
def test_does_not_chain_when_batch_incomplete(self, mock_request):
98+
mock_request.return_value = MagicMock(status=200)
99+
100+
with patch(
101+
"sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async"
102+
) as mock_chain:
103+
backfill_supergroups_lightweight_for_org(self.organization.id)
104+
105+
mock_chain.assert_not_called()
106+
107+
@patch(
108+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
109+
)
110+
def test_respects_killswitch(self, mock_request):
111+
with self.options({"seer.supergroups_backfill_lightweight.killswitch": True}):
112+
backfill_supergroups_lightweight_for_org(self.organization.id)
113+
114+
mock_request.assert_not_called()
115+
116+
@patch(
117+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
118+
)
119+
def test_skips_without_feature_flag(self, mock_request):
120+
backfill_supergroups_lightweight_for_org(self.organization.id)
121+
122+
mock_request.assert_not_called()
123+
124+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
125+
@patch(
126+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
127+
)
128+
def test_continues_on_individual_group_failure(self, mock_request):
129+
event2 = self.store_event(
130+
data={"message": "second error", "level": "error", "fingerprint": ["group2"]},
131+
project_id=self.project.id,
132+
)
133+
event2.group.substatus = GroupSubStatus.NEW
134+
event2.group.save(update_fields=["substatus"])
135+
136+
mock_request.side_effect = [
137+
MagicMock(status=500),
138+
MagicMock(status=200),
139+
]
140+
141+
backfill_supergroups_lightweight_for_org(self.organization.id)
142+
143+
assert mock_request.call_count == 2
144+
145+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
146+
@patch(
147+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
148+
)
149+
def test_filters_old_groups(self, mock_request):
150+
self.group.last_seen = datetime.now(UTC) - timedelta(days=91)
151+
self.group.save(update_fields=["last_seen"])
152+
153+
backfill_supergroups_lightweight_for_org(self.organization.id)
154+
155+
mock_request.assert_not_called()
156+
157+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
158+
@patch(
159+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
160+
)
161+
def test_skips_non_error_groups(self, mock_request):
162+
self.group.type = DEFAULT_TYPE_ID + 1
163+
self.group.save(update_fields=["type"])
164+
165+
backfill_supergroups_lightweight_for_org(self.organization.id)
166+
167+
mock_request.assert_not_called()
168+
169+
@with_feature("organizations:supergroups-lightweight-rca-clustering")
170+
@patch(
171+
"sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request"
172+
)
173+
def test_resumes_from_cursor(self, mock_request):
174+
backfill_supergroups_lightweight_for_org(
175+
self.organization.id,
176+
last_project_id=self.project.id,
177+
last_group_id=self.group.id,
178+
)
179+
180+
mock_request.assert_not_called()

0 commit comments

Comments
 (0)