-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
feat(seer): Add lightweight supergroups backfill task #112507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7b309e4
3e509b5
694a5f8
86e0a24
4fdfd88
0f9a648
c6e1a84
207b576
d45bf7e
e0e92a1
4523c6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,257 @@ | ||
| import logging | ||
| from collections.abc import Sequence | ||
| from datetime import UTC, datetime, timedelta | ||
|
|
||
| from snuba_sdk import Column, Condition, Direction, Entity, Limit, Op, OrderBy, Query, Request | ||
|
|
||
| from sentry import features, options | ||
| from sentry.api.serializers import EventSerializer, serialize | ||
| from sentry.eventstore import backend as eventstore | ||
| from sentry.models.group import DEFAULT_TYPE_ID, Group, GroupStatus | ||
| from sentry.models.organization import Organization | ||
| from sentry.models.project import Project | ||
| from sentry.seer.signed_seer_api import ( | ||
| LightweightRCAClusterRequest, | ||
| SeerViewerContext, | ||
| make_lightweight_rca_cluster_request, | ||
| ) | ||
| from sentry.services.eventstore.models import Event | ||
| from sentry.snuba.dataset import Dataset | ||
| from sentry.tasks.base import instrumented_task | ||
| from sentry.taskworker.namespaces import seer_tasks | ||
| from sentry.types.group import UNRESOLVED_SUBSTATUS_CHOICES | ||
| from sentry.utils import metrics | ||
| from sentry.utils.snuba import bulk_snuba_queries | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| BATCH_SIZE = 50 | ||
| INTER_BATCH_DELAY_S = 1 | ||
| MAX_FAILURES_PER_BATCH = 20 | ||
|
|
||
|
|
||
| @instrumented_task( | ||
| name="sentry.tasks.seer.backfill_supergroups_lightweight.backfill_supergroups_lightweight_for_org", | ||
| namespace=seer_tasks, | ||
| processing_deadline_duration=15 * 60, | ||
| ) | ||
| def backfill_supergroups_lightweight_for_org( | ||
| organization_id: int, | ||
| last_project_id: int = 0, | ||
|
yuvmen marked this conversation as resolved.
|
||
| last_group_id: int = 0, | ||
| **kwargs, | ||
| ) -> None: | ||
| if options.get("seer.supergroups_backfill_lightweight.killswitch"): | ||
| logger.info("supergroups_backfill_lightweight.killswitch_enabled") | ||
| return | ||
|
|
||
| try: | ||
| organization = Organization.objects.get(id=organization_id) | ||
| except Organization.DoesNotExist: | ||
| return | ||
|
|
||
| if not features.has("organizations:supergroups-lightweight-rca-clustering-write", organization): | ||
| logger.info( | ||
| "supergroups_backfill_lightweight.feature_not_enabled", | ||
| extra={"organization_id": organization_id}, | ||
| ) | ||
| return | ||
|
|
||
| # Get the next project to process, starting from where we left off | ||
| project = ( | ||
| Project.objects.filter( | ||
| organization_id=organization_id, | ||
| id__gte=last_project_id or 0, | ||
| ) | ||
| .order_by("id") | ||
| .first() | ||
| ) | ||
|
|
||
| if not project: | ||
| logger.info( | ||
| "supergroups_backfill_lightweight.org_completed", | ||
| extra={"organization_id": organization_id}, | ||
| ) | ||
| return | ||
|
|
||
| # If we moved to a new project, reset the group cursor | ||
| if project.id != last_project_id: | ||
| last_group_id = 0 | ||
|
|
||
| groups = list( | ||
| Group.objects.filter( | ||
| project_id=project.id, | ||
| type=DEFAULT_TYPE_ID, | ||
| id__gt=last_group_id, | ||
| status=GroupStatus.UNRESOLVED, | ||
| substatus__in=UNRESOLVED_SUBSTATUS_CHOICES, | ||
| ) | ||
| .select_related("project", "project__organization") | ||
| .order_by("id")[:BATCH_SIZE] | ||
| ) | ||
|
|
||
| if not groups: | ||
| # Current project exhausted, move to the next one | ||
| backfill_supergroups_lightweight_for_org.apply_async( | ||
| args=[organization_id], | ||
| kwargs={ | ||
| "last_project_id": project.id + 1, | ||
| "last_group_id": 0, | ||
| }, | ||
| countdown=INTER_BATCH_DELAY_S, | ||
| headers={"sentry-propagate-traces": False}, | ||
| ) | ||
| return | ||
|
|
||
| # Phase 1: Batch fetch event data | ||
| group_event_pairs = _batch_fetch_events(groups, organization_id) | ||
|
|
||
| # Phase 2: Send to Seer (per-group for now, bulk-ready) | ||
| failure_count = 0 | ||
| success_count = 0 | ||
| last_processed_group_id = last_group_id | ||
| viewer_context = SeerViewerContext(organization_id=organization_id) | ||
|
|
||
| for group, serialized_event in group_event_pairs: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a threadpool here so that we can parallelize requests?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, v1 grouping had it, was again me trying to keep it simple but maybe ill just add it
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'll just be horribly slow without this. Ideally the api would just accept multiple groups but if that's not worth the effort then at least using a threadpool speeds things up somewhat
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realized that actually on seer side we just queue a task and return, so its going to be fast. We could probably add a way to bad send to reduce the overhaed of all the requests but its not like we are going to be waiting a ton of time on these, so I dont think its that important right now.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem isn't the speed of the api on the other side, it's that you're waiting for IO on this side to get anything done. The task on the other side could complete in 0 seconds and it'd still result in this being much slower. This isn't blocking though so I can approve
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea I understand, I am actually fine with this being slow, I am more worried about being too fast for Seer |
||
| try: | ||
| body = LightweightRCAClusterRequest( | ||
| group_id=group.id, | ||
| issue={ | ||
| "id": group.id, | ||
| "title": group.title, | ||
| "short_id": group.qualified_short_id, | ||
| "events": [serialized_event], | ||
|
yuvmen marked this conversation as resolved.
|
||
| }, | ||
| organization_slug=organization.slug, | ||
| organization_id=organization_id, | ||
| project_id=group.project_id, | ||
| ) | ||
| response = make_lightweight_rca_cluster_request( | ||
| body, timeout=30, viewer_context=viewer_context | ||
| ) | ||
| if response.status >= 400: | ||
| logger.warning( | ||
| "supergroups_backfill_lightweight.seer_error", | ||
| extra={ | ||
| "group_id": group.id, | ||
| "project_id": group.project_id, | ||
| "status": response.status, | ||
| }, | ||
| ) | ||
| failure_count += 1 | ||
| else: | ||
| success_count += 1 | ||
| except Exception: | ||
| logger.exception( | ||
| "supergroups_backfill_lightweight.group_failed", | ||
| extra={"group_id": group.id, "project_id": group.project_id}, | ||
| ) | ||
| failure_count += 1 | ||
|
|
||
| last_processed_group_id = group.id | ||
|
|
||
| if failure_count >= MAX_FAILURES_PER_BATCH: | ||
| logger.error( | ||
| "supergroups_backfill_lightweight.max_failures_reached", | ||
| extra={ | ||
| "organization_id": organization_id, | ||
| "project_id": project.id, | ||
| "failure_count": failure_count, | ||
| "last_processed_group_id": last_processed_group_id, | ||
| }, | ||
| ) | ||
| break | ||
|
|
||
| metrics.incr( | ||
| "seer.supergroups_backfill_lightweight.groups_processed", | ||
| amount=success_count, | ||
| ) | ||
| metrics.incr( | ||
| "seer.supergroups_backfill_lightweight.groups_failed", | ||
| amount=failure_count, | ||
| ) | ||
|
|
||
| if failure_count >= MAX_FAILURES_PER_BATCH: | ||
| return | ||
|
|
||
| # Self-chain: more groups in this project, or move to next project | ||
| if len(groups) == BATCH_SIZE: | ||
| next_project_id = project.id | ||
| next_group_id = groups[-1].id | ||
| else: | ||
| next_project_id = project.id + 1 | ||
| next_group_id = 0 | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| backfill_supergroups_lightweight_for_org.apply_async( | ||
| args=[organization_id], | ||
| kwargs={ | ||
| "last_project_id": next_project_id, | ||
| "last_group_id": next_group_id, | ||
| }, | ||
| countdown=INTER_BATCH_DELAY_S, | ||
| headers={"sentry-propagate-traces": False}, | ||
| ) | ||
|
sentry[bot] marked this conversation as resolved.
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def _batch_fetch_events(groups: Sequence[Group], organization_id: int) -> list[tuple[Group, dict]]: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's going to be pretty slow to make a query per group here. And you're probably also likely to start hitting snuba ratelimits. Do you actually need the latest event for each group, or just any event? You could group by
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what I do here is the way they did in the V1, though things might have changed for sure. Right now I think I am okay with naively taking any event, that might change though.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think that at least this way you can send a batch of 100/1000/whatever groups in the same project and just get a result back. You could still batch this into multiple queries as needed, but I think it'll be much faster if you can do on average 1 query per org (probably most orgs have less than 1k groups) |
||
| """ | ||
| Fetch the latest event for each group using batched Snuba queries, | ||
| then serialize each event for sending to Seer. | ||
| """ | ||
| now = datetime.now(UTC) | ||
|
|
||
| # Build one Snuba request per group to find the latest event_id | ||
| snuba_requests = [] | ||
| for group in groups: | ||
| # Use a tight window around the group's last_seen to minimize Snuba scan range | ||
| group_start = group.last_seen - timedelta(hours=1) | ||
| snuba_requests.append( | ||
| Request( | ||
| dataset=Dataset.Events.value, | ||
| app_id="supergroups_backfill", | ||
| query=Query( | ||
| match=Entity(Dataset.Events.value), | ||
| select=[Column("event_id"), Column("group_id"), Column("project_id")], | ||
| where=[ | ||
| Condition(Column("project_id"), Op.EQ, group.project_id), | ||
| Condition(Column("group_id"), Op.EQ, group.id), | ||
| Condition(Column("timestamp"), Op.GTE, group_start), | ||
|
yuvmen marked this conversation as resolved.
|
||
| Condition(Column("timestamp"), Op.LT, now + timedelta(minutes=5)), | ||
| ], | ||
| orderby=[OrderBy(Column("timestamp"), Direction.DESC)], | ||
| limit=Limit(1), | ||
| ), | ||
| tenant_ids={"organization_id": organization_id}, | ||
| ) | ||
| ) | ||
|
|
||
| results = bulk_snuba_queries( | ||
| snuba_requests, referrer="supergroups_backfill_lightweight.get_latest_events" | ||
| ) | ||
|
|
||
| # Build unfetched Event objects from Snuba results, keeping groups aligned | ||
| matched_groups: list[Group] = [] | ||
| events: list[Event] = [] | ||
| for group, result in zip(groups, results): | ||
| rows = result.get("data", []) | ||
| if not rows: | ||
| continue | ||
| matched_groups.append(group) | ||
| events.append( | ||
| Event(project_id=group.project_id, event_id=rows[0]["event_id"], group_id=group.id) | ||
| ) | ||
|
|
||
| if not events: | ||
| return [] | ||
|
|
||
| # Batch fetch all event data from nodestore in one multi-get | ||
| eventstore.bind_nodes(events) | ||
|
|
||
| # Bulk serialize all events | ||
| serialized_events = serialize(events, None, EventSerializer()) | ||
|
|
||
| return [ | ||
| (group, serialized_event) | ||
| for group, serialized_event in zip(matched_groups, serialized_events) | ||
| if serialized_event | ||
| ] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will this task be scheduled/spawned? Will we be able to spawn them incrementally over time so that we don't generate a big backlog all at once that consumes all the worker capacity preventing other tasks from running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I currently plan to add a custom run job on getsentry to trigger manually per org, however in the future when we plan to backfill everything we will just have a loop over orgs. I dont plan to run this on multiple orgs in parallel for now.
When this was done for AI grouping v1 I believe we actually did it all project by project and basically rate limited it, so it took a ton of time (months) but keeping the rate low meant we didnt overload worker capacity / bombard seer.
Right now this task doesnt use any parallelism and just spawns the next batch after a batch is done, so I dont think its capable of consuming all worker capacity for a single org. @wedamija commented on adding a threadpool, I am considering it so this task wouldnt be dead slow, however as you mention I will indeed need to make sure we dont spawn too many threads, both for Sentry and Seer's sake.
This task is meant to be an tool to get a few orgs/projects backfilled and be able to POC on this lightweight implementation. We will need to do more tweaking to be efficient when running it for everything.