diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 0e52be1d1bb0f6..3bc7499b8769f7 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -993,6 +993,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.tasks.seer.context_engine_index", "sentry.tasks.seer.lightweight_rca_cluster", "sentry.tasks.seer.night_shift.cron", + "sentry.tasks.seer.backfill_supergroups_lightweight", # Used for tests "sentry.taskworker.tasks.examples", ) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 8cc2e60b7a4b20..be4bdfc2c5b07e 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -1373,6 +1373,13 @@ flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "seer.supergroups_backfill_lightweight.killswitch", + type=Bool, + default=False, + flags=FLAG_MODIFIABLE_BOOL | FLAG_AUTOMATOR_MODIFIABLE, +) + # ## sentry.killswitches # # The following options are documented in sentry.killswitches in more detail diff --git a/src/sentry/tasks/seer/backfill_supergroups_lightweight.py b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py new file mode 100644 index 00000000000000..1829f19bcd7994 --- /dev/null +++ b/src/sentry/tasks/seer/backfill_supergroups_lightweight.py @@ -0,0 +1,257 @@ +import logging +from collections.abc import Sequence +from datetime import UTC, datetime, timedelta + +from snuba_sdk import Column, Condition, Direction, Entity, Limit, Op, OrderBy, Query, Request + +from sentry import features, options +from sentry.api.serializers import EventSerializer, serialize +from sentry.eventstore import backend as eventstore +from sentry.models.group import DEFAULT_TYPE_ID, Group, GroupStatus +from sentry.models.organization import Organization +from sentry.models.project import Project +from sentry.seer.signed_seer_api import ( + LightweightRCAClusterRequest, + SeerViewerContext, + make_lightweight_rca_cluster_request, +) +from sentry.services.eventstore.models import Event +from sentry.snuba.dataset import Dataset +from sentry.tasks.base import instrumented_task +from sentry.taskworker.namespaces import seer_tasks +from sentry.types.group import UNRESOLVED_SUBSTATUS_CHOICES +from sentry.utils import metrics +from sentry.utils.snuba import bulk_snuba_queries + +logger = logging.getLogger(__name__) + +BATCH_SIZE = 50 +INTER_BATCH_DELAY_S = 1 +MAX_FAILURES_PER_BATCH = 20 + + +@instrumented_task( + name="sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org", + namespace=seer_tasks, + processing_deadline_duration=15 * 60, +) +def backfill_supergroups_lightweight_for_org( + organization_id: int, + last_project_id: int = 0, + last_group_id: int = 0, + **kwargs, +) -> None: + if options.get("seer.supergroups_backfill_lightweight.killswitch"): + logger.info("supergroups_backfill_lightweight.killswitch_enabled") + return + + try: + organization = Organization.objects.get(id=organization_id) + except Organization.DoesNotExist: + return + + if not features.has("organizations:supergroups-lightweight-rca-clustering-write", organization): + logger.info( + "supergroups_backfill_lightweight.feature_not_enabled", + extra={"organization_id": organization_id}, + ) + return + + # Get the next project to process, starting from where we left off + project = ( + Project.objects.filter( + organization_id=organization_id, + id__gte=last_project_id or 0, + ) + .order_by("id") + .first() + ) + + if not project: + logger.info( + "supergroups_backfill_lightweight.org_completed", + extra={"organization_id": organization_id}, + ) + return + + # If we moved to a new project, reset the group cursor + if project.id != last_project_id: + last_group_id = 0 + + groups = list( + Group.objects.filter( + project_id=project.id, + type=DEFAULT_TYPE_ID, + id__gt=last_group_id, + status=GroupStatus.UNRESOLVED, + substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, + ) + .select_related("project", "project__organization") + .order_by("id")[:BATCH_SIZE] + ) + + if not groups: + # Current project exhausted, move to the next one + backfill_supergroups_lightweight_for_org.apply_async( + args=[organization_id], + kwargs={ + "last_project_id": project.id + 1, + "last_group_id": 0, + }, + countdown=INTER_BATCH_DELAY_S, + headers={"sentry-propagate-traces": False}, + ) + return + + # Phase 1: Batch fetch event data + group_event_pairs = _batch_fetch_events(groups, organization_id) + + # Phase 2: Send to Seer (per-group for now, bulk-ready) + failure_count = 0 + success_count = 0 + last_processed_group_id = last_group_id + viewer_context = SeerViewerContext(organization_id=organization_id) + + for group, serialized_event in group_event_pairs: + try: + body = LightweightRCAClusterRequest( + group_id=group.id, + issue={ + "id": group.id, + "title": group.title, + "short_id": group.qualified_short_id, + "events": [serialized_event], + }, + organization_slug=organization.slug, + organization_id=organization_id, + project_id=group.project_id, + ) + response = make_lightweight_rca_cluster_request( + body, timeout=30, viewer_context=viewer_context + ) + if response.status >= 400: + logger.warning( + "supergroups_backfill_lightweight.seer_error", + extra={ + "group_id": group.id, + "project_id": group.project_id, + "status": response.status, + }, + ) + failure_count += 1 + else: + success_count += 1 + except Exception: + logger.exception( + "supergroups_backfill_lightweight.group_failed", + extra={"group_id": group.id, "project_id": group.project_id}, + ) + failure_count += 1 + + last_processed_group_id = group.id + + if failure_count >= MAX_FAILURES_PER_BATCH: + logger.error( + "supergroups_backfill_lightweight.max_failures_reached", + extra={ + "organization_id": organization_id, + "project_id": project.id, + "failure_count": failure_count, + "last_processed_group_id": last_processed_group_id, + }, + ) + break + + metrics.incr( + "seer.supergroups_backfill_lightweight.groups_processed", + amount=success_count, + ) + metrics.incr( + "seer.supergroups_backfill_lightweight.groups_failed", + amount=failure_count, + ) + + if failure_count >= MAX_FAILURES_PER_BATCH: + return + + # Self-chain: more groups in this project, or move to next project + if len(groups) == BATCH_SIZE: + next_project_id = project.id + next_group_id = groups[-1].id + else: + next_project_id = project.id + 1 + next_group_id = 0 + + backfill_supergroups_lightweight_for_org.apply_async( + args=[organization_id], + kwargs={ + "last_project_id": next_project_id, + "last_group_id": next_group_id, + }, + countdown=INTER_BATCH_DELAY_S, + headers={"sentry-propagate-traces": False}, + ) + + +def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[tuple[Group, dict]]: + """ + Fetch the latest event for each group using batched Snuba queries, + then serialize each event for sending to Seer. + """ + now = datetime.now(UTC) + + # Build one Snuba request per group to find the latest event_id + snuba_requests = [] + for group in groups: + # Use a tight window around the group's last_seen to minimize Snuba scan range + group_start = group.last_seen - timedelta(hours=1) + snuba_requests.append( + Request( + dataset=Dataset.Events.value, + app_id="supergroups_backfill", + query=Query( + match=Entity(Dataset.Events.value), + select=[Column("event_id"), Column("group_id"), Column("project_id")], + where=[ + Condition(Column("project_id"), Op.EQ, group.project_id), + Condition(Column("group_id"), Op.EQ, group.id), + Condition(Column("timestamp"), Op.GTE, group_start), + Condition(Column("timestamp"), Op.LT, now + timedelta(minutes=5)), + ], + orderby=[OrderBy(Column("timestamp"), Direction.DESC)], + limit=Limit(1), + ), + tenant_ids={"organization_id": organization_id}, + ) + ) + + results = bulk_snuba_queries( + snuba_requests, referrer="supergroups_backfill_lightweight.get_latest_events" + ) + + # Build unfetched Event objects from Snuba results, keeping groups aligned + matched_groups: list[Group] = [] + events: list[Event] = [] + for group, result in zip(groups, results): + rows = result.get("data", []) + if not rows: + continue + matched_groups.append(group) + events.append( + Event(project_id=group.project_id, event_id=rows[0]["event_id"], group_id=group.id) + ) + + if not events: + return [] + + # Batch fetch all event data from nodestore in one multi-get + eventstore.bind_nodes(events) + + # Bulk serialize all events + serialized_events = serialize(events, None, EventSerializer()) + + return [ + (group, serialized_event) + for group, serialized_event in zip(matched_groups, serialized_events) + if serialized_event + ] diff --git a/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py new file mode 100644 index 00000000000000..1ac5bfec79d7ea --- /dev/null +++ b/tests/sentry/tasks/seer/test_backfill_supergroups_lightweight.py @@ -0,0 +1,269 @@ +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + +from sentry.models.group import DEFAULT_TYPE_ID +from sentry.tasks.seer.backfill_supergroups_lightweight import ( + BATCH_SIZE, + backfill_supergroups_lightweight_for_org, +) +from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.features import with_feature +from sentry.types.group import GroupSubStatus + + +class BackfillSupergroupsLightweightForOrgTest(TestCase): + def setUp(self): + super().setUp() + self.event = self.store_event( + data={"message": "test error", "level": "error"}, + project_id=self.project.id, + ) + self.group = self.event.group + self.group.substatus = GroupSubStatus.NEW + self.group.save(update_fields=["substatus"]) + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_processes_groups_and_sends_to_seer(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_called_once() + body = mock_request.call_args.args[0] + assert body["group_id"] == self.group.id + assert body["project_id"] == self.project.id + assert body["organization_id"] == self.organization.id + assert body["issue"]["id"] == self.group.id + assert len(body["issue"]["events"]) == 1 + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_moves_to_next_project_after_current_exhausted(self, mock_request): + """After finishing one project's groups, chains to the next project.""" + mock_request.return_value = MagicMock(status=200) + + project2 = self.create_project(organization=self.organization) + event2 = self.store_event( + data={"message": "error in project2", "level": "error"}, + project_id=project2.id, + ) + assert event2.group is not None + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + # First call processes first project's groups + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + + # Should chain to next project + mock_chain.assert_called_once() + next_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert next_kwargs["last_group_id"] == 0 + + # Second call processes second project's groups + mock_request.reset_mock() + backfill_supergroups_lightweight_for_org(self.organization.id, **next_kwargs) + mock_request.assert_called_once() + assert mock_request.call_args.args[0]["project_id"] == project2.id + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_self_chains_when_more_groups_exist(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + # Create enough groups to fill a batch + for i in range(BATCH_SIZE): + evt = self.store_event( + data={ + "message": f"error {i}", + "level": "error", + "fingerprint": [f"group-{i}"], + }, + project_id=self.project.id, + ) + assert evt.group is not None + evt.group.substatus = GroupSubStatus.NEW + evt.group.save(update_fields=["substatus"]) + + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_chain.assert_called_once() + call_kwargs = mock_chain.call_args.kwargs["kwargs"] + # Should stay on the same project with a group cursor + assert call_kwargs["last_project_id"] == self.project.id + assert call_kwargs["last_group_id"] > 0 + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_completes_when_no_more_projects(self, mock_request): + """When all projects are exhausted, the task completes without chaining.""" + mock_request.return_value = MagicMock(status=200) + + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + # Pass a project ID higher than any real project + backfill_supergroups_lightweight_for_org( + self.organization.id, + last_project_id=self.project.id + 9999, + ) + + mock_request.assert_not_called() + mock_chain.assert_not_called() + + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_respects_killswitch(self, mock_request): + with self.options({"seer.supergroups_backfill_lightweight.killswitch": True}): + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_skips_without_feature_flag(self, mock_request): + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_continues_on_individual_group_failure(self, mock_request): + event2 = self.store_event( + data={"message": "second error", "level": "error", "fingerprint": ["group2"]}, + project_id=self.project.id, + ) + assert event2.group is not None + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + mock_request.side_effect = [ + MagicMock(status=500), + MagicMock(status=200), + ] + + backfill_supergroups_lightweight_for_org(self.organization.id) + + assert mock_request.call_count == 2 + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + @patch("sentry.tasks.seer.backfill_supergroups_lightweight.bulk_snuba_queries") + def test_skips_old_groups_with_no_events(self, mock_snuba, mock_request): + """Old groups are still fetched from DB but skipped when Snuba returns no events.""" + self.group.last_seen = datetime.now(UTC) - timedelta(days=91) + self.group.save(update_fields=["last_seen"]) + + mock_snuba.return_value = [{"data": []}] + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_skips_non_error_groups(self, mock_request): + self.group.type = DEFAULT_TYPE_ID + 1 + self.group.save(update_fields=["type"]) + + backfill_supergroups_lightweight_for_org(self.organization.id) + + mock_request.assert_not_called() + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_resumes_from_cursor(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + event2 = self.store_event( + data={"message": "second error", "level": "error", "fingerprint": ["group2"]}, + project_id=self.project.id, + ) + assert event2.group is not None + event2.group.substatus = GroupSubStatus.NEW + event2.group.save(update_fields=["substatus"]) + + # Resume from cursor pointing at the first group — should only process the second + backfill_supergroups_lightweight_for_org( + self.organization.id, + last_project_id=self.project.id, + last_group_id=self.group.id, + ) + + mock_request.assert_called_once() + assert mock_request.call_args.args[0]["group_id"] == event2.group.id + + @with_feature("organizations:supergroups-lightweight-rca-clustering-write") + @patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.make_lightweight_rca_cluster_request" + ) + def test_chains_then_completes_on_exact_batch_boundary(self, mock_request): + mock_request.return_value = MagicMock(status=200) + + # Create exactly BATCH_SIZE groups total (setUp already created 1) + for i in range(BATCH_SIZE - 1): + evt = self.store_event( + data={ + "message": f"error {i}", + "level": "error", + "fingerprint": [f"boundary-{i}"], + }, + project_id=self.project.id, + ) + assert evt.group is not None + evt.group.substatus = GroupSubStatus.NEW + evt.group.save(update_fields=["substatus"]) + + # First call: full batch, chains with same project and group cursor + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id) + mock_chain.assert_called_once() + next_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert next_kwargs["last_project_id"] == self.project.id + assert next_kwargs["last_group_id"] > 0 + + # Second call: no groups left in project, chains to next project + mock_request.reset_mock() + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id, **next_kwargs) + mock_request.assert_not_called() + mock_chain.assert_called_once() + final_kwargs = mock_chain.call_args.kwargs["kwargs"] + assert final_kwargs["last_group_id"] == 0 + + # Third call: no more projects, completes + mock_request.reset_mock() + with patch( + "sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org.apply_async" + ) as mock_chain: + backfill_supergroups_lightweight_for_org(self.organization.id, **final_kwargs) + mock_request.assert_not_called() + mock_chain.assert_not_called()