From 7b309e401d8e61d371cfb6e6143faf822b5fe991 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Tue, 7 Apr 2026 16:52:57 -0700 Subject: [PATCH 1/9] feat(seer): Add rca_source to supergroup queries and use feature flag Add RCASource enum and rca_source field to supergroup query requests so Seer knows which embedding space to query. The source is determined by the organizations:supergroups-lightweight-rca-clustering feature flag. Replace the supergroups.lightweight-enabled-orgs sentry-option with the feature flag for both the write path (post_process task dispatch) and read path (supergroup query endpoints), consistent with how all other supergroup features are gated. --- src/sentry/features/temporary.py | 2 + src/sentry/options/defaults.py | 8 --- src/sentry/seer/signed_seer_api.py | 8 +++ .../organization_supergroup_details.py | 14 +++- .../organization_supergroups_by_group.py | 9 ++- src/sentry/tasks/post_process.py | 3 +- .../test_organization_supergroup_details.py | 67 +++++++++++++++++++ .../test_organization_supergroups_by_group.py | 35 ++++++++++ tests/sentry/tasks/test_post_process.py | 17 +++-- 9 files changed, 141 insertions(+), 22 deletions(-) create mode 100644 tests/sentry/seer/supergroups/endpoints/test_organization_supergroup_details.py diff --git a/src/sentry/features/temporary.py b/src/sentry/features/temporary.py index f48a566b091fee..c589432368cc03 100644 --- a/src/sentry/features/temporary.py +++ b/src/sentry/features/temporary.py @@ -571,6 +571,8 @@ def register_temporary_features(manager: FeatureManager) -> None: manager.add("projects:supergroup-embeddings-explorer", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) # Enable lightweight Explorer RCA runs for supergroup quality evaluation manager.add("projects:supergroup-lightweight-rca", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) + # Use lightweight RCA source for supergroup clustering and queries + manager.add("organizations:supergroups-lightweight-rca-clustering", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) manager.add("projects:workflow-engine-performance-detectors", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 8725c6ad5c15d0..c58843c709136d 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -1385,14 +1385,6 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) -# Supergroups / Lightweight RCA -register( - "supergroups.lightweight-enabled-orgs", - type=Sequence, - default=[], - flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, -) - # ## sentry.killswitches # # The following options are documented in sentry.killswitches in more detail diff --git a/src/sentry/seer/signed_seer_api.py b/src/sentry/seer/signed_seer_api.py index 33752a24de8b8b..83f8e4f4489271 100644 --- a/src/sentry/seer/signed_seer_api.py +++ b/src/sentry/seer/signed_seer_api.py @@ -1,6 +1,7 @@ import hashlib import hmac import logging +from enum import StrEnum from typing import Any, NotRequired, TypedDict from urllib.parse import urlparse @@ -369,6 +370,11 @@ class SummarizeIssueRequest(TypedDict): experiment_variant: NotRequired[str | None] +class RCASource(StrEnum): + EXPLORER = "explorer" + LIGHTWEIGHT = "lightweight" + + class SupergroupsEmbeddingRequest(TypedDict): organization_id: int group_id: int @@ -387,11 +393,13 @@ class LightweightRCAClusterRequest(TypedDict): class SupergroupsGetRequest(TypedDict): organization_id: int supergroup_id: int + rca_source: str class SupergroupsGetByGroupIdsRequest(TypedDict): organization_id: int group_ids: list[int] + rca_source: str class SupergroupDetailData(TypedDict): diff --git a/src/sentry/seer/supergroups/endpoints/organization_supergroup_details.py b/src/sentry/seer/supergroups/endpoints/organization_supergroup_details.py index ab03c3fe13099a..e809056257abab 100644 --- a/src/sentry/seer/supergroups/endpoints/organization_supergroup_details.py +++ b/src/sentry/seer/supergroups/endpoints/organization_supergroup_details.py @@ -12,7 +12,7 @@ from sentry.api.base import cell_silo_endpoint from sentry.api.bases.organization import OrganizationEndpoint, OrganizationPermission from sentry.models.organization import Organization -from sentry.seer.signed_seer_api import SeerViewerContext, make_supergroups_get_request +from sentry.seer.signed_seer_api import RCASource, SeerViewerContext, make_supergroups_get_request logger = logging.getLogger(__name__) @@ -35,8 +35,18 @@ def get(self, request: Request, organization: Organization, supergroup_id: int) if not features.has("organizations:top-issues-ui", organization, actor=request.user): return Response({"detail": "Feature not available"}, status=403) + rca_source = ( + RCASource.LIGHTWEIGHT + if features.has("organizations:supergroups-lightweight-rca-clustering", organization) + else RCASource.EXPLORER + ) + response = make_supergroups_get_request( - {"organization_id": organization.id, "supergroup_id": supergroup_id}, + { + "organization_id": organization.id, + "supergroup_id": supergroup_id, + "rca_source": rca_source, + }, SeerViewerContext(organization_id=organization.id, user_id=request.user.id), timeout=10, ) diff --git a/src/sentry/seer/supergroups/endpoints/organization_supergroups_by_group.py b/src/sentry/seer/supergroups/endpoints/organization_supergroups_by_group.py index fc2c50de7a59be..6505be0774c0b4 100644 --- a/src/sentry/seer/supergroups/endpoints/organization_supergroups_by_group.py +++ b/src/sentry/seer/supergroups/endpoints/organization_supergroups_by_group.py @@ -15,6 +15,7 @@ from sentry.models.group import STATUS_QUERY_CHOICES, Group from sentry.models.organization import Organization from sentry.seer.signed_seer_api import ( + RCASource, SeerViewerContext, SupergroupsByGroupIdsResponse, make_supergroups_get_by_group_ids_request, @@ -77,8 +78,14 @@ def get(self, request: Request, organization: Organization) -> Response: status=status_codes.HTTP_404_NOT_FOUND, ) + rca_source = ( + RCASource.LIGHTWEIGHT + if features.has("organizations:supergroups-lightweight-rca-clustering", organization) + else RCASource.EXPLORER + ) + response = make_supergroups_get_by_group_ids_request( - {"organization_id": organization.id, "group_ids": group_ids}, + {"organization_id": organization.id, "group_ids": group_ids, "rca_source": rca_source}, SeerViewerContext(organization_id=organization.id, user_id=request.user.id), timeout=10, ) diff --git a/src/sentry/tasks/post_process.py b/src/sentry/tasks/post_process.py index a7d5ac582ad224..4650dccb7f9062 100644 --- a/src/sentry/tasks/post_process.py +++ b/src/sentry/tasks/post_process.py @@ -1593,8 +1593,7 @@ def kick_off_lightweight_rca_cluster(job: PostProcessJob) -> None: event = job["event"] group = event.group - enabled_orgs: list[int] = options.get("supergroups.lightweight-enabled-orgs") - if group.organization.id not in enabled_orgs: + if not features.has("organizations:supergroups-lightweight-rca-clustering", group.organization): return trigger_lightweight_rca_cluster_task.delay(group.id) diff --git a/tests/sentry/seer/supergroups/endpoints/test_organization_supergroup_details.py b/tests/sentry/seer/supergroups/endpoints/test_organization_supergroup_details.py new file mode 100644 index 00000000000000..e94f10f83c3852 --- /dev/null +++ b/tests/sentry/seer/supergroups/endpoints/test_organization_supergroup_details.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from typing import Any +from unittest.mock import MagicMock, patch + +import orjson + +from sentry.testutils.cases import APITestCase + + +def mock_seer_response(data: dict[str, Any]) -> MagicMock: + response = MagicMock() + response.status = 200 + response.data = orjson.dumps(data) + return response + + +class OrganizationSupergroupDetailsEndpointTest(APITestCase): + endpoint = "sentry-api-0-organization-supergroup-details" + + def setUp(self): + super().setUp() + self.login_as(self.user) + + @patch( + "sentry.seer.supergroups.endpoints.organization_supergroup_details.make_supergroups_get_request" + ) + def test_get_supergroup_details(self, mock_seer): + mock_seer.return_value = mock_seer_response( + {"id": 1, "title": "NullPointerException in auth", "group_ids": [10, 20]} + ) + + with self.feature("organizations:top-issues-ui"): + response = self.get_success_response(self.organization.slug, "1") + + assert response.data["id"] == 1 + assert response.data["title"] == "NullPointerException in auth" + assert response.data["group_ids"] == [10, 20] + + @patch( + "sentry.seer.supergroups.endpoints.organization_supergroup_details.make_supergroups_get_request" + ) + def test_rca_source_defaults_to_explorer(self, mock_seer): + mock_seer.return_value = mock_seer_response({"id": 1, "title": "test"}) + + with self.feature("organizations:top-issues-ui"): + self.get_success_response(self.organization.slug, "1") + + body = mock_seer.call_args.args[0] + assert body["rca_source"] == "explorer" + + @patch( + "sentry.seer.supergroups.endpoints.organization_supergroup_details.make_supergroups_get_request" + ) + def test_rca_source_lightweight_when_flag_enabled(self, mock_seer): + mock_seer.return_value = mock_seer_response({"id": 1, "title": "test"}) + + with self.feature( + { + "organizations:top-issues-ui": True, + "organizations:supergroups-lightweight-rca-clustering": True, + } + ): + self.get_success_response(self.organization.slug, "1") + + body = mock_seer.call_args.args[0] + assert body["rca_source"] == "lightweight" diff --git a/tests/sentry/seer/supergroups/endpoints/test_organization_supergroups_by_group.py b/tests/sentry/seer/supergroups/endpoints/test_organization_supergroups_by_group.py index 4419d824602847..d6ce658e370303 100644 --- a/tests/sentry/seer/supergroups/endpoints/test_organization_supergroups_by_group.py +++ b/tests/sentry/seer/supergroups/endpoints/test_organization_supergroups_by_group.py @@ -74,3 +74,38 @@ def test_status_filter_invalid(self): status="bogus", status_code=400, ) + + @patch( + "sentry.seer.supergroups.endpoints.organization_supergroups_by_group.make_supergroups_get_by_group_ids_request" + ) + def test_rca_source_defaults_to_explorer(self, mock_seer): + mock_seer.return_value = mock_seer_response({"data": []}) + + with self.feature("organizations:top-issues-ui"): + self.get_success_response( + self.organization.slug, + group_id=[self.unresolved_group.id], + ) + + body = mock_seer.call_args.args[0] + assert body["rca_source"] == "explorer" + + @patch( + "sentry.seer.supergroups.endpoints.organization_supergroups_by_group.make_supergroups_get_by_group_ids_request" + ) + def test_rca_source_lightweight_when_flag_enabled(self, mock_seer): + mock_seer.return_value = mock_seer_response({"data": []}) + + with self.feature( + { + "organizations:top-issues-ui": True, + "organizations:supergroups-lightweight-rca-clustering": True, + } + ): + self.get_success_response( + self.organization.slug, + group_id=[self.unresolved_group.id], + ) + + body = mock_seer.call_args.args[0] + assert body["rca_source"] == "lightweight" diff --git a/tests/sentry/tasks/test_post_process.py b/tests/sentry/tasks/test_post_process.py index 9ee29b5d7fc7b1..0f0555be84c4dd 100644 --- a/tests/sentry/tasks/test_post_process.py +++ b/tests/sentry/tasks/test_post_process.py @@ -3078,7 +3078,7 @@ def test_kick_off_lightweight_rca_cluster_when_enabled(self, mock_task): project_id=self.project.id, ) - with self.options({"supergroups.lightweight-enabled-orgs": [self.project.organization.id]}): + with self.feature("organizations:supergroups-lightweight-rca-clustering"): self.call_post_process_group( is_new=True, is_regression=False, @@ -3095,13 +3095,12 @@ def test_kick_off_lightweight_rca_cluster_skips_when_not_enabled(self, mock_task project_id=self.project.id, ) - with self.options({"supergroups.lightweight-enabled-orgs": []}): - self.call_post_process_group( - is_new=True, - is_regression=False, - is_new_group_environment=True, - event=event, - ) + self.call_post_process_group( + is_new=True, + is_regression=False, + is_new_group_environment=True, + event=event, + ) mock_task.assert_not_called() @@ -3112,7 +3111,7 @@ def test_kick_off_lightweight_rca_cluster_skips_when_not_new(self, mock_task): project_id=self.project.id, ) - with self.options({"supergroups.lightweight-enabled-orgs": [self.project.organization.id]}): + with self.feature("organizations:supergroups-lightweight-rca-clustering"): self.call_post_process_group( is_new=False, is_regression=False, From 3e509b5cbb5ad6d75118cd469c73feb95991f39e Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Wed, 8 Apr 2026 12:35:58 -0700 Subject: [PATCH 2/9] feat(seer): Add lightweight supergroups backfill task Add an org-scoped Celery task that iterates all error groups in an organization (seen in last 90 days) and sends each to Seer's lightweight RCA clustering endpoint for supergroup backfilling. The task processes groups in batches of 50 with cursor-based pagination and self-chains until all groups are processed. Designed to be triggered from a getsentry job. Co-Authored-By: Claude Opus 4.6 --- src/sentry/options/defaults.py | 7 + .../seer/backfill_supergroups_lightweight.py | 186 ++++++++++++++++++ .../test_backfill_supergroups_lightweight.py | 180 +++++++++++++++++ 3 files changed, 373 insertions(+) create mode 100644 src/sentry/tasks/seer/backfill_supergroups_lightweight.py create mode 100644 tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index c58843c709136d..04f3af38370b09 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -1385,6 +1385,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "seer.supergroups_backfill_lightweight.killswitch", + type=Bool, + default=False, + flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE, +) + # ## sentry.killswitches # # The following options are documented in sentry.killswitches in more detail diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py new file mode 100644 index 00000000000000..8607a96bf74da6 --- /dev/null +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -0,0 +1,186 @@ +import logging +from datetime import UTC, datetime, timedelta + +from sentry import features, options +from sentry.api.serializers import EventSerializer, serialize +from sentry.eventstore import backend as eventstore +from sentry.models.group import DEFAULT_TYPE_ID, Group +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.seer.signed_seer_api import ( + LightweightRCAClusterRequest, + SeerViewerContext, + make_lightweight_rca_cluster_request, +) +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import seer_tasks +from sentry.types.group import GroupSubStatus +from sentry.utils import metrics + +logger = logging.getLogger(__name__) + +BACKFILL_LAST_SEEN_DAYS = 90 +BATCH_SIZE = 50 +INTER_BATCH_DELAY_S = 5 + + +@instrumented_task( + name="sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org", + namespace=seer_tasks, + processing_deadline_duration=15 * 60, +) +def backfill_supergroups_lightweight_for_org( + organization_id: int, + last_project_id: int = 0, + last_group_id: int = 0, + **kwargs, +) -> None: + if options.get("seer.supergroups_backfill_lightweight.killswitch"): + logger.info("supergroups_backfill_lightweight.killswitch_enabled") + return + + try: + organization = Organization.objects.get(id=organization_id) + except Organization.DoesNotExist: + return + + if not features.has("organizations:supergroups-lightweight-rca-clustering", organization): + logger.info( + "supergroups_backfill_lightweight.feature_not_enabled", + extra={"organization_id": organization_id}, + ) + return + + project_ids = list( + Project.objects.filter( + organization_id=organization_id, + id__gte=last_project_id, + ) + .order_by("id") + .values_list("id", flat=True) + ) + + if not project_ids: + logger.info( + "supergroups_backfill_lightweight.org_completed", + extra={"organization_id": organization_id}, + ) + return + + cutoff = datetime.now(UTC) - timedelta(days=BACKFILL_LAST_SEEN_DAYS) + + group_filter = Group.objects.filter( + project_id__in=project_ids, + type=DEFAULT_TYPE_ID, + last_seen__gte=cutoff, + substatus__in=[ + GroupSubStatus.ONGOING, + GroupSubStatus.NEW, + GroupSubStatus.ESCALATING, + GroupSubStatus.REGRESSED, + ], + ) + + if last_group_id > 0: + group_filter = group_filter.filter( + project_id=last_project_id, id__gt=last_group_id + ) | group_filter.filter(project_id__gt=last_project_id) + else: + group_filter = group_filter.filter(project_id__gte=last_project_id) + + groups = list( + group_filter.select_related("project", "project__organization").order_by( + "project_id", "id" + )[:BATCH_SIZE] + ) + + if not groups: + logger.info( + "supergroups_backfill_lightweight.org_completed", + extra={"organization_id": organization_id}, + ) + return + + # Phase 1: Batch fetch event data + group_event_pairs: list[tuple[Group, dict]] = [] + for group in groups: + event = group.get_latest_event() + if not event: + continue + + ready_event = eventstore.get_event_by_id( + group.project_id, event.event_id, group_id=group.id + ) + if not ready_event: + continue + + serialized_event = serialize(ready_event, None, EventSerializer()) + group_event_pairs.append((group, serialized_event)) + + # Phase 2: Send to Seer (per-group for now, bulk-ready) + failure_count = 0 + success_count = 0 + viewer_context = SeerViewerContext(organization_id=organization_id) + + for group, serialized_event in group_event_pairs: + try: + body = LightweightRCAClusterRequest( + group_id=group.id, + issue={ + "id": group.id, + "title": group.title, + "short_id": group.qualified_short_id, + "events": [serialized_event], + }, + organization_slug=organization.slug, + organization_id=organization_id, + project_id=group.project_id, + ) + response = make_lightweight_rca_cluster_request( + body, timeout=30, viewer_context=viewer_context + ) + if response.status >= 400: + logger.warning( + "supergroups_backfill_lightweight.seer_error", + extra={ + "group_id": group.id, + "project_id": group.project_id, + "status": response.status, + }, + ) + failure_count += 1 + else: + success_count += 1 + except Exception: + logger.exception( + "supergroups_backfill_lightweight.group_failed", + extra={"group_id": group.id, "project_id": group.project_id}, + ) + failure_count += 1 + + metrics.incr( + "seer.supergroups_backfill_lightweight.groups_processed", + amount=success_count, + ) + metrics.incr( + "seer.supergroups_backfill_lightweight.groups_failed", + amount=failure_count, + ) + + # Self-chain if there are more groups to process + if len(groups) == BATCH_SIZE: + last_group = groups[-1] + backfill_supergroups_lightweight_for_org.apply_async( + args=[organization_id], + kwargs={ + "last_project_id": last_group.project_id, + "last_group_id": last_group.id, + }, + countdown=INTER_BATCH_DELAY_S, + headers={"sentry-propagate-traces": False}, + ) + else: + logger.info( + "supergroups_backfill_lightweight.org_completed", + extra={"organization_id": organization_id}, + ) diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py new file mode 100644 index 00000000000000..16a904afa07752 --- /dev/null +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -0,0 +1,180 @@ +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +from sentry.models.group import DEFAULT_TYPE_ID +from sentry.tasks.seer.backfill_supergroups_lightweight import ( + BATCH_SIZE, + backfill_supergroups_lightweight_for_org, +) +from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.features import with_feature +from sentry.types.group import GroupSubStatus + + +class BackfillSupergroupsLightweightForOrgTest(TestCase): + def setUp(self): + super().setUp() + self.event = self.store_event( + data={"message": "test error", "level": "error"}, + project_id=self.project.id, + ) + self.group = self.event.group + self.group.substatus = GroupSubStatus.NEW + self.group.save(update_fields=["substatus"]) + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_processes_groups_and_sends_to_seer(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_called_once() + body = mock_request.call_args.args[0] + assert body["group_id"] == self.group.id + assert body["project_id"] == self.project.id + assert body["organization_id"] == self.organization.id + assert body["issue"]["id"] == self.group.id + assert len(body["issue"]["events"]) == 1 + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_processes_groups_across_projects(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + project2 = self.create_project(organization=self.organization) + event2 = self.store_event( + data={"message": "error in project2", "level": "error"}, + project_id=project2.id, + ) + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + assert mock_request.call_count == 2 + project_ids = {call.args[0]["project_id"] for call in mock_request.call_args_list} + assert project_ids == {self.project.id, project2.id} + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_self_chains_when_more_groups_exist(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + # Create enough groups to fill a batch + for i in range(BATCH_SIZE): + evt = self.store_event( + data={ + "message": f"error {i}", + "level": "error", + "fingerprint": [f"group-{i}"], + }, + project_id=self.project.id, + ) + evt.group.substatus = GroupSubStatus.NEW + evt.group.save(update_fields=["substatus"]) + + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_chain.assert_called_once() + call_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert call_kwargs["last_project_id"] == self.project.id + assert call_kwargs["last_group_id"] > 0 + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_does_not_chain_when_batch_incomplete(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_chain.assert_not_called() + + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_respects_killswitch(self, mock_request): + with self.options({"seer.supergroups_backfill_lightweight.killswitch": True}): + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_skips_without_feature_flag(self, mock_request): + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_continues_on_individual_group_failure(self, mock_request): + event2 = self.store_event( + data={"message": "second error", "level": "error", "fingerprint": ["group2"]}, + project_id=self.project.id, + ) + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + mock_request.side_effect = [ + MagicMock(status=500), + MagicMock(status=200), + ] + + backfill_supergroups_lightweight_for_org(self.organization.id) + + assert mock_request.call_count == 2 + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_filters_old_groups(self, mock_request): + self.group.last_seen = datetime.now(UTC) - timedelta(days=91) + self.group.save(update_fields=["last_seen"]) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_skips_non_error_groups(self, mock_request): + self.group.type = DEFAULT_TYPE_ID + 1 + self.group.save(update_fields=["type"]) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_resumes_from_cursor(self, mock_request): + backfill_supergroups_lightweight_for_org( + self.organization.id, + last_project_id=self.project.id, + last_group_id=self.group.id, + ) + + mock_request.assert_not_called() From 694a5f864b052e7ff33dc451837b3e952a592588 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Wed, 8 Apr 2026 13:18:53 -0700 Subject: [PATCH 3/9] fix(seer): Register backfill task and fix typing errors Add backfill_supergroups_lightweight to TASKWORKER_IMPORTS so the task is discovered in production. Fix mypy errors by asserting event.group is not None in tests. Co-Authored-By: Claude Opus 4.6 --- src/sentry/conf/server.py | 1 + .../sentry/tasks/seer/test_backfill_supergroups_lightweight.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 938539fc20afdb..15c8f7ab54e9ad 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -992,6 +992,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.tasks.seer.context_engine_index", "sentry.tasks.seer.lightweight_rca_cluster", "sentry.tasks.seer.night_shift", + "sentry.tasks.seer.backfill_supergroups_lightweight", # Used for tests "sentry.taskworker.tasks.examples", ) diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py index 16a904afa07752..6aeb74e6e4e408 100644 --- a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -51,6 +51,7 @@ def test_processes_groups_across_projects(self, mock_request): data={"message": "error in project2", "level": "error"}, project_id=project2.id, ) + assert event2.group is not None event2.group.substatus = GroupSubStatus.NEW event2.group.save(update_fields=["substatus"]) @@ -77,6 +78,7 @@ def test_self_chains_when_more_groups_exist(self, mock_request): }, project_id=self.project.id, ) + assert evt.group is not None evt.group.substatus = GroupSubStatus.NEW evt.group.save(update_fields=["substatus"]) @@ -130,6 +132,7 @@ def test_continues_on_individual_group_failure(self, mock_request): data={"message": "second error", "level": "error", "fingerprint": ["group2"]}, project_id=self.project.id, ) + assert event2.group is not None event2.group.substatus = GroupSubStatus.NEW event2.group.save(update_fields=["substatus"]) From 4fdfd88562992b3812c65d97ec6f3e915392d417 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Wed, 8 Apr 2026 14:10:07 -0700 Subject: [PATCH 4/9] ref(seer): Batch Snuba queries for event fetching and improve tests Replace per-group get_latest_event() calls with batched Snuba queries via bulk_snuba_queries for the event fetching phase. Uses a tight timestamp window around each group's last_seen. Also reduces inter-batch delay to 1s, rewrites cursor resumption test to verify only post-cursor groups are processed, and adds exact batch boundary edge case test. Co-Authored-By: Claude Opus 4.6 --- .../seer/backfill_supergroups_lightweight.py | 78 +++++++++++++++---- .../test_backfill_supergroups_lightweight.py | 52 ++++++++++++- 2 files changed, 114 insertions(+), 16 deletions(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index 7e440f56bf0895..070930f0a3c082 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -1,6 +1,9 @@ import logging +from collections.abc import Sequence from datetime import UTC, datetime, timedelta +from snuba_sdk import Column, Condition, Direction, Entity, Limit, Op, OrderBy, Query, Request + from sentry import features, options from sentry.api.serializers import EventSerializer, serialize from sentry.eventstore import backend as eventstore @@ -12,16 +15,18 @@ SeerViewerContext, make_lightweight_rca_cluster_request, ) +from sentry.snuba.dataset import Dataset from sentry.tasks.base import instrumented_task from sentry.taskworker.namespaces import seer_tasks from sentry.types.group import GroupSubStatus from sentry.utils import metrics +from sentry.utils.snuba import bulk_snuba_queries logger = logging.getLogger(__name__) BACKFILL_LAST_SEEN_DAYS = 90 BATCH_SIZE = 50 -INTER_BATCH_DELAY_S = 5 +INTER_BATCH_DELAY_S = 1 @instrumented_task( @@ -102,20 +107,7 @@ def backfill_supergroups_lightweight_for_org( return # Phase 1: Batch fetch event data - group_event_pairs: list[tuple[Group, dict]] = [] - for group in groups: - event = group.get_latest_event() - if not event: - continue - - ready_event = eventstore.get_event_by_id( - group.project_id, event.event_id, group_id=group.id - ) - if not ready_event: - continue - - serialized_event = serialize(ready_event, None, EventSerializer()) - group_event_pairs.append((group, serialized_event)) + group_event_pairs = _batch_fetch_events(groups, organization_id) # Phase 2: Send to Seer (per-group for now, bulk-ready) failure_count = 0 @@ -184,3 +176,59 @@ def backfill_supergroups_lightweight_for_org( "supergroups_backfill_lightweight.org_completed", extra={"organization_id": organization_id}, ) + + +def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[tuple[Group, dict]]: + """ + Fetch the latest event for each group using batched Snuba queries, + then serialize each event for sending to Seer. + """ + now = datetime.now(UTC) + timestamp_start = now - timedelta(days=BACKFILL_LAST_SEEN_DAYS) + + # Build one Snuba request per group to find the latest event_id + snuba_requests = [] + for group in groups: + # Use a tight window around the group's last_seen to minimize scan range, + # falling back to the full backfill window if last_seen is unavailable + group_start = group.last_seen - timedelta(hours=1) if group.last_seen else timestamp_start + snuba_requests.append( + Request( + dataset=Dataset.Events.value, + app_id="supergroups_backfill", + query=Query( + match=Entity(Dataset.Events.value), + select=[Column("event_id"), Column("group_id"), Column("project_id")], + where=[ + Condition(Column("project_id"), Op.EQ, group.project_id), + Condition(Column("group_id"), Op.EQ, group.id), + Condition(Column("timestamp"), Op.GTE, group_start), + Condition(Column("timestamp"), Op.LT, now + timedelta(minutes=5)), + ], + orderby=[OrderBy(Column("timestamp"), Direction.DESC)], + limit=Limit(1), + ), + tenant_ids={"organization_id": organization_id}, + ) + ) + + results = bulk_snuba_queries( + snuba_requests, referrer="supergroups_backfill_lightweight.get_latest_events" + ) + + # Fetch full events from nodestore and serialize + group_event_pairs: list[tuple[Group, dict]] = [] + for group, result in zip(groups, results): + rows = result.get("data", []) + if not rows: + continue + + event_id = rows[0]["event_id"] + ready_event = eventstore.get_event_by_id(group.project_id, event_id, group_id=group.id) + if not ready_event: + continue + + serialized_event = serialize(ready_event, None, EventSerializer()) + group_event_pairs.append((group, serialized_event)) + + return group_event_pairs diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py index 806ee89aef6fe0..6a3b54e20b523d 100644 --- a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -174,10 +174,60 @@ def test_skips_non_error_groups(self, mock_request): "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" ) def test_resumes_from_cursor(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + event2 = self.store_event( + data={"message": "second error", "level": "error", "fingerprint": ["group2"]}, + project_id=self.project.id, + ) + assert event2.group is not None + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + # Resume from cursor pointing at the first group — should only process the second backfill_supergroups_lightweight_for_org( self.organization.id, last_project_id=self.project.id, last_group_id=self.group.id, ) - mock_request.assert_not_called() + mock_request.assert_called_once() + assert mock_request.call_args.args[0]["group_id"] == event2.group.id + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_chains_then_completes_on_exact_batch_boundary(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + # Create exactly BATCH_SIZE groups total (setUp already created 1) + for i in range(BATCH_SIZE - 1): + evt = self.store_event( + data={ + "message": f"error {i}", + "level": "error", + "fingerprint": [f"boundary-{i}"], + }, + project_id=self.project.id, + ) + assert evt.group is not None + evt.group.substatus = GroupSubStatus.NEW + evt.group.save(update_fields=["substatus"]) + + # First call: full batch, should self-chain + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + mock_chain.assert_called_once() + next_kwargs = mock_chain.call_args.kwargs["kwargs"] + + # Second call with the cursor: no groups left, should not chain + mock_request.reset_mock() + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id, **next_kwargs) + mock_request.assert_not_called() + mock_chain.assert_not_called() From 0f9a6487c755964a78a9a17ff47689c75a83b9ca Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Wed, 8 Apr 2026 14:58:59 -0700 Subject: [PATCH 5/9] ref(seer): Use UNRESOLVED_SUBSTATUS_CHOICES constant Replace hardcoded substatus list with the canonical UNRESOLVED_SUBSTATUS_CHOICES constant from sentry.types.group. Co-Authored-By: Claude Opus 4.6 --- .../tasks/seer/backfill_supergroups_lightweight.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index 070930f0a3c082..695a6e47ff7bca 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -18,7 +18,7 @@ from sentry.snuba.dataset import Dataset from sentry.tasks.base import instrumented_task from sentry.taskworker.namespaces import seer_tasks -from sentry.types.group import GroupSubStatus +from sentry.types.group import UNRESOLVED_SUBSTATUS_CHOICES from sentry.utils import metrics from sentry.utils.snuba import bulk_snuba_queries @@ -78,12 +78,7 @@ def backfill_supergroups_lightweight_for_org( project_id__in=project_ids, type=DEFAULT_TYPE_ID, last_seen__gte=cutoff, - substatus__in=[ - GroupSubStatus.ONGOING, - GroupSubStatus.NEW, - GroupSubStatus.ESCALATING, - GroupSubStatus.REGRESSED, - ], + substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, ) if last_group_id > 0: From c6e1a849acf8741fd6a105919932311f65a57492 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Thu, 9 Apr 2026 13:52:24 -0700 Subject: [PATCH 6/9] ref(seer): Iterate by project for index efficiency and add failure threshold Refactor to process one project at a time using the (project, status, substatus, last_seen, id) composite index for efficient cursor pagination at any scale. Add MAX_FAILURES_PER_BATCH=20 to stop processing if Seer is consistently failing. Filter by status=UNRESOLVED. Remove dead timestamp fallback. Co-Authored-By: Claude Opus 4.6 --- .../seer/backfill_supergroups_lightweight.py | 101 ++++++++++-------- .../test_backfill_supergroups_lightweight.py | 50 +++++++-- 2 files changed, 98 insertions(+), 53 deletions(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index 695a6e47ff7bca..a5b45c42d1b9e9 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -7,7 +7,7 @@ from sentry import features, options from sentry.api.serializers import EventSerializer, serialize from sentry.eventstore import backend as eventstore -from sentry.models.group import DEFAULT_TYPE_ID, Group +from sentry.models.group import DEFAULT_TYPE_ID, Group, GroupStatus from sentry.models.organization import Organization from sentry.models.project import Project from sentry.seer.signed_seer_api import ( @@ -27,6 +27,7 @@ BACKFILL_LAST_SEEN_DAYS = 90 BATCH_SIZE = 50 INTER_BATCH_DELAY_S = 1 +MAX_FAILURES_PER_BATCH = 20 @instrumented_task( @@ -56,48 +57,52 @@ def backfill_supergroups_lightweight_for_org( ) return - project_ids = list( + # Get the next project to process, starting from where we left off + project = ( Project.objects.filter( organization_id=organization_id, - id__gte=last_project_id, + id__gte=last_project_id or 0, ) .order_by("id") - .values_list("id", flat=True) + .first() ) - if not project_ids: + if not project: logger.info( "supergroups_backfill_lightweight.org_completed", extra={"organization_id": organization_id}, ) return - cutoff = datetime.now(UTC) - timedelta(days=BACKFILL_LAST_SEEN_DAYS) - - group_filter = Group.objects.filter( - project_id__in=project_ids, - type=DEFAULT_TYPE_ID, - last_seen__gte=cutoff, - substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, - ) + # If we moved to a new project, reset the group cursor + if project.id != last_project_id: + last_group_id = 0 - if last_group_id > 0: - group_filter = group_filter.filter( - project_id=last_project_id, id__gt=last_group_id - ) | group_filter.filter(project_id__gt=last_project_id) - else: - group_filter = group_filter.filter(project_id__gte=last_project_id) + cutoff = datetime.now(UTC) - timedelta(days=BACKFILL_LAST_SEEN_DAYS) groups = list( - group_filter.select_related("project", "project__organization").order_by( - "project_id", "id" - )[:BATCH_SIZE] + Group.objects.filter( + project_id=project.id, + type=DEFAULT_TYPE_ID, + id__gt=last_group_id, + last_seen__gte=cutoff, + status=GroupStatus.UNRESOLVED, + substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, + ) + .select_related("project", "project__organization") + .order_by("id")[:BATCH_SIZE] ) if not groups: - logger.info( - "supergroups_backfill_lightweight.org_completed", - extra={"organization_id": organization_id}, + # Current project exhausted, move to the next one + backfill_supergroups_lightweight_for_org.apply_async( + args=[organization_id], + kwargs={ + "last_project_id": project.id + 1, + "last_group_id": 0, + }, + countdown=INTER_BATCH_DELAY_S, + headers={"sentry-propagate-traces": False}, ) return @@ -145,6 +150,16 @@ def backfill_supergroups_lightweight_for_org( ) failure_count += 1 + if failure_count >= MAX_FAILURES_PER_BATCH: + logger.error( + "supergroups_backfill_lightweight.max_failures_reached", + extra={ + "organization_id": organization_id, + "failure_count": failure_count, + }, + ) + break + metrics.incr( "seer.supergroups_backfill_lightweight.groups_processed", amount=success_count, @@ -154,23 +169,23 @@ def backfill_supergroups_lightweight_for_org( amount=failure_count, ) - # Self-chain if there are more groups to process + # Self-chain: more groups in this project, or move to next project if len(groups) == BATCH_SIZE: - last_group = groups[-1] - backfill_supergroups_lightweight_for_org.apply_async( - args=[organization_id], - kwargs={ - "last_project_id": last_group.project_id, - "last_group_id": last_group.id, - }, - countdown=INTER_BATCH_DELAY_S, - headers={"sentry-propagate-traces": False}, - ) + next_project_id = project.id + next_group_id = groups[-1].id else: - logger.info( - "supergroups_backfill_lightweight.org_completed", - extra={"organization_id": organization_id}, - ) + next_project_id = project.id + 1 + next_group_id = 0 + + backfill_supergroups_lightweight_for_org.apply_async( + args=[organization_id], + kwargs={ + "last_project_id": next_project_id, + "last_group_id": next_group_id, + }, + countdown=INTER_BATCH_DELAY_S, + headers={"sentry-propagate-traces": False}, + ) def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[tuple[Group, dict]]: @@ -179,14 +194,12 @@ def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[t then serialize each event for sending to Seer. """ now = datetime.now(UTC) - timestamp_start = now - timedelta(days=BACKFILL_LAST_SEEN_DAYS) # Build one Snuba request per group to find the latest event_id snuba_requests = [] for group in groups: - # Use a tight window around the group's last_seen to minimize scan range, - # falling back to the full backfill window if last_seen is unavailable - group_start = group.last_seen - timedelta(hours=1) if group.last_seen else timestamp_start + # Use a tight window around the group's last_seen to minimize Snuba scan range + group_start = group.last_seen - timedelta(hours=1) snuba_requests.append( Request( dataset=Dataset.Events.value, diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py index 6a3b54e20b523d..c21486737caf80 100644 --- a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -43,7 +43,8 @@ def test_processes_groups_and_sends_to_seer(self, mock_request): @patch( "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" ) - def test_processes_groups_across_projects(self, mock_request): + def test_moves_to_next_project_after_current_exhausted(self, mock_request): + """After finishing one project's groups, chains to the next project.""" mock_request.return_value = MagicMock(status=200) project2 = self.create_project(organization=self.organization) @@ -55,11 +56,22 @@ def test_processes_groups_across_projects(self, mock_request): event2.group.substatus = GroupSubStatus.NEW event2.group.save(update_fields=["substatus"]) - backfill_supergroups_lightweight_for_org(self.organization.id) + # First call processes first project's groups + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) - assert mock_request.call_count == 2 - project_ids = {call.args[0]["project_id"] for call in mock_request.call_args_list} - assert project_ids == {self.project.id, project2.id} + # Should chain to next project + mock_chain.assert_called_once() + next_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert next_kwargs["last_group_id"] == 0 + + # Second call processes second project's groups + mock_request.reset_mock() + backfill_supergroups_lightweight_for_org(self.organization.id, **next_kwargs) + mock_request.assert_called_once() + assert mock_request.call_args.args[0]["project_id"] == project2.id @with_feature("organizations:supergroups-lightweight-rca-clustering-write") @patch( @@ -89,6 +101,7 @@ def test_self_chains_when_more_groups_exist(self, mock_request): mock_chain.assert_called_once() call_kwargs = mock_chain.call_args.kwargs["kwargs"] + # Should stay on the same project with a group cursor assert call_kwargs["last_project_id"] == self.project.id assert call_kwargs["last_group_id"] > 0 @@ -96,14 +109,20 @@ def test_self_chains_when_more_groups_exist(self, mock_request): @patch( "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" ) - def test_does_not_chain_when_batch_incomplete(self, mock_request): + def test_completes_when_no_more_projects(self, mock_request): + """When all projects are exhausted, the task completes without chaining.""" mock_request.return_value = MagicMock(status=200) with patch( "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" ) as mock_chain: - backfill_supergroups_lightweight_for_org(self.organization.id) + # Pass a project ID higher than any real project + backfill_supergroups_lightweight_for_org( + self.organization.id, + last_project_id=self.project.id + 9999, + ) + mock_request.assert_not_called() mock_chain.assert_not_called() @patch( @@ -215,19 +234,32 @@ def test_chains_then_completes_on_exact_batch_boundary(self, mock_request): evt.group.substatus = GroupSubStatus.NEW evt.group.save(update_fields=["substatus"]) - # First call: full batch, should self-chain + # First call: full batch, chains with same project and group cursor with patch( "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" ) as mock_chain: backfill_supergroups_lightweight_for_org(self.organization.id) mock_chain.assert_called_once() next_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert next_kwargs["last_project_id"] == self.project.id + assert next_kwargs["last_group_id"] > 0 - # Second call with the cursor: no groups left, should not chain + # Second call: no groups left in project, chains to next project mock_request.reset_mock() with patch( "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" ) as mock_chain: backfill_supergroups_lightweight_for_org(self.organization.id, **next_kwargs) mock_request.assert_not_called() + mock_chain.assert_called_once() + final_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert final_kwargs["last_group_id"] == 0 + + # Third call: no more projects, completes + mock_request.reset_mock() + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id, **final_kwargs) + mock_request.assert_not_called() mock_chain.assert_not_called() From 207b5765ef9a82839676a74f70c61567c3272897 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Fri, 10 Apr 2026 14:11:42 -0700 Subject: [PATCH 7/9] fix(seer): Fix backfill cursor, failure handling, and group filtering - Track last_processed_group_id so early break on max failures doesn't skip unprocessed groups - Stop self-chaining when max failures is reached to avoid hammering Seer when it's down - Add project_id and last_processed_group_id to max failures log for easier resume - Skip groups with failed event serialization instead of sending None - Remove last_seen cutoff filter; old groups are naturally skipped when their events are gone from Snuba/nodestore --- .../seer/backfill_supergroups_lightweight.py | 16 +++++++++++----- .../test_backfill_supergroups_lightweight.py | 6 +++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index a5b45c42d1b9e9..2b81c4d197d17f 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -24,7 +24,6 @@ logger = logging.getLogger(__name__) -BACKFILL_LAST_SEEN_DAYS = 90 BATCH_SIZE = 50 INTER_BATCH_DELAY_S = 1 MAX_FAILURES_PER_BATCH = 20 @@ -78,14 +77,11 @@ def backfill_supergroups_lightweight_for_org( if project.id != last_project_id: last_group_id = 0 - cutoff = datetime.now(UTC) - timedelta(days=BACKFILL_LAST_SEEN_DAYS) - groups = list( Group.objects.filter( project_id=project.id, type=DEFAULT_TYPE_ID, id__gt=last_group_id, - last_seen__gte=cutoff, status=GroupStatus.UNRESOLVED, substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, ) @@ -112,6 +108,7 @@ def backfill_supergroups_lightweight_for_org( # Phase 2: Send to Seer (per-group for now, bulk-ready) failure_count = 0 success_count = 0 + last_processed_group_id = last_group_id viewer_context = SeerViewerContext(organization_id=organization_id) for group, serialized_event in group_event_pairs: @@ -150,12 +147,16 @@ def backfill_supergroups_lightweight_for_org( ) failure_count += 1 + last_processed_group_id = group.id + if failure_count >= MAX_FAILURES_PER_BATCH: logger.error( "supergroups_backfill_lightweight.max_failures_reached", extra={ "organization_id": organization_id, + "project_id": project.id, "failure_count": failure_count, + "last_processed_group_id": last_processed_group_id, }, ) break @@ -169,10 +170,13 @@ def backfill_supergroups_lightweight_for_org( amount=failure_count, ) + if failure_count >= MAX_FAILURES_PER_BATCH: + return + # Self-chain: more groups in this project, or move to next project if len(groups) == BATCH_SIZE: next_project_id = project.id - next_group_id = groups[-1].id + next_group_id = last_processed_group_id else: next_project_id = project.id + 1 next_group_id = 0 @@ -237,6 +241,8 @@ def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[t continue serialized_event = serialize(ready_event, None, EventSerializer()) + if not serialized_event: + continue group_event_pairs.append((group, serialized_event)) return group_event_pairs diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py index c21486737caf80..1ac5bfec79d7ea 100644 --- a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -168,10 +168,14 @@ def test_continues_on_individual_group_failure(self, mock_request): @patch( "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" ) - def test_filters_old_groups(self, mock_request): + @patch("sentry.tasks.seer.backfill_supergroups_lightweight.bulk_snuba_queries") + def test_skips_old_groups_with_no_events(self, mock_snuba, mock_request): + """Old groups are still fetched from DB but skipped when Snuba returns no events.""" self.group.last_seen = datetime.now(UTC) - timedelta(days=91) self.group.save(update_fields=["last_seen"]) + mock_snuba.return_value = [{"data": []}] + backfill_supergroups_lightweight_for_org(self.organization.id) mock_request.assert_not_called() From d45bf7e492702819dfa1394c777abbb7fdf7fc45 Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Fri, 10 Apr 2026 14:49:53 -0700 Subject: [PATCH 8/9] perf(seer): Batch nodestore fetches and serialization in backfill Use bind_nodes() for a single nodestore multi-get instead of 50 sequential get_event_by_id calls. Bulk serialize all events in one serialize() call to batch get_attrs(). Cuts the event fetch phase from ~3-5s to ~500ms per batch. --- .../seer/backfill_supergroups_lightweight.py | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index 2b81c4d197d17f..88225d2bfe5d3d 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -15,6 +15,7 @@ SeerViewerContext, make_lightweight_rca_cluster_request, ) +from sentry.services.eventstore.models import Event from sentry.snuba.dataset import Dataset from sentry.tasks.base import instrumented_task from sentry.taskworker.namespaces import seer_tasks @@ -228,21 +229,29 @@ def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[t snuba_requests, referrer="supergroups_backfill_lightweight.get_latest_events" ) - # Fetch full events from nodestore and serialize - group_event_pairs: list[tuple[Group, dict]] = [] + # Build unfetched Event objects from Snuba results, keeping groups aligned + matched_groups: list[Group] = [] + events: list[Event] = [] for group, result in zip(groups, results): rows = result.get("data", []) if not rows: continue + matched_groups.append(group) + events.append( + Event(project_id=group.project_id, event_id=rows[0]["event_id"], group_id=group.id) + ) - event_id = rows[0]["event_id"] - ready_event = eventstore.get_event_by_id(group.project_id, event_id, group_id=group.id) - if not ready_event: - continue + if not events: + return [] - serialized_event = serialize(ready_event, None, EventSerializer()) - if not serialized_event: - continue - group_event_pairs.append((group, serialized_event)) + # Batch fetch all event data from nodestore in one multi-get + eventstore.bind_nodes(events) + + # Bulk serialize all events + serialized_events = serialize(events, None, EventSerializer()) - return group_event_pairs + return [ + (group, serialized_event) + for group, serialized_event in zip(matched_groups, serialized_events) + if serialized_event + ] From 4523c6c8818fb65f21c434772a0306671fa5785b Mon Sep 17 00:00:00 2001 From: Yuval Mandelboum Date: Fri, 10 Apr 2026 15:32:11 -0700 Subject: [PATCH 9/9] fix(seer): Use groups[-1].id for backfill cursor to avoid infinite loop last_processed_group_id only tracks groups with Snuba events, so eventless groups at the end of a batch would never be skipped, causing an infinite re-fetch loop. Since we now return early on max failures (no self-chain), groups[-1].id is safe for the cursor. --- src/sentry/tasks/seer/backfill_supergroups_lightweight.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py index 88225d2bfe5d3d..1829f19bcd7994 100644 --- a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -177,7 +177,7 @@ def backfill_supergroups_lightweight_for_org( # Self-chain: more groups in this project, or move to next project if len(groups) == BATCH_SIZE: next_project_id = project.id - next_group_id = last_processed_group_id + next_group_id = groups[-1].id else: next_project_id = project.id + 1 next_group_id = 0