Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sentry/tasks/llm_issue_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from sentry.tasks.llm_issue_detection.detection import (
DetectedIssue,
create_issue_occurrence_from_detection,
detect_llm_issues_for_project,
detect_llm_issues_for_org,
run_llm_issue_detection,
)

__all__ = [
"DetectedIssue",
"create_issue_occurrence_from_detection",
"detect_llm_issues_for_project",
"detect_llm_issues_for_org",
"run_llm_issue_detection",
]
124 changes: 74 additions & 50 deletions src/sentry/tasks/llm_issue_detection/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from urllib3 import BaseHTTPResponse

from sentry import features, options
from sentry.constants import VALID_PLATFORMS
from sentry.constants import VALID_PLATFORMS, ObjectStatus
from sentry.issues.grouptype import (
AIDetectedCodeHealthGroupType,
AIDetectedDBGroupType,
Expand All @@ -25,12 +25,15 @@
)
from sentry.issues.issue_occurrence import IssueEvidence, IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
from sentry.models.organization import Organization, OrganizationStatus
from sentry.models.project import Project
from sentry.net.http import connection_from_url
from sentry.seer.explorer.utils import normalize_description
from sentry.seer.signed_seer_api import SeerViewerContext, make_signed_seer_api_request
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import issues_tasks
from sentry.utils.hashlib import md5_text
from sentry.utils.query import RangeQuerySetWrapper
from sentry.utils.redis import redis_clusters

logger = logging.getLogger("sentry.tasks.llm_issue_detection")
Expand All @@ -39,13 +42,14 @@
SEER_TIMEOUT_S = 10
START_TIME_DELTA_MINUTES = 60
TRANSACTION_BATCH_SIZE = 50
NUM_TRANSACTIONS_TO_PROCESS = 10
TRACE_PROCESSING_TTL_SECONDS = 7200
# Character limit for LLM-generated fields to protect against abuse.
# Word limits are enforced by Seer's prompt (see seer/automation/issue_detection/analyze.py).
# This limit prevents excessively long outputs from malicious or malfunctioning LLMs.
MAX_LLM_FIELD_LENGTH = 2000

DISPATCH_INTERVAL_MINUTES = 15
NUM_DISPATCH_SLOTS = 10
Comment thread
roggenkemper marked this conversation as resolved.
MAX_ORGS_PER_CYCLE = 500
ORG_DISPATCH_STAGGER_SECONDS = 15


seer_issue_detection_connection_pool = connection_from_url(
settings.SEER_AUTOFIX_URL,
Expand Down Expand Up @@ -260,13 +264,16 @@
)


def get_enabled_project_ids() -> list[int]:
"""
Get the list of project IDs that are explicitly enabled for LLM detection.
def _get_current_dispatch_slot() -> int:
"""Return the current time slot index for bucketed dispatch."""
now = datetime.now(UTC)
minutes_since_epoch = int(now.timestamp()) // 60
return (minutes_since_epoch // DISPATCH_INTERVAL_MINUTES) % NUM_DISPATCH_SLOTS

Returns the allowlist from system options.
"""
return options.get("issue-detection.llm-detection.projects-allowlist")

def _org_in_slot(org_id: int, slot: int) -> bool:
"""Check if an org's hash-assigned slot matches the given slot."""
return int(md5_text(str(org_id)).hexdigest(), 16) % NUM_DISPATCH_SLOTS == slot


@instrumented_task(
Expand All @@ -277,52 +284,83 @@
def run_llm_issue_detection() -> None:
"""
Main scheduled task for LLM issue detection.

Uses md5 hash bucketing to spread org dispatches across time slots.
Each 15-minute cycle processes one slot's worth of orgs.
"""
if not options.get("issue-detection.llm-detection.enabled"):
return

enabled_project_ids = get_enabled_project_ids()
if not enabled_project_ids:
return

# Spawn a sub-task for each project with staggered delays
for index, project_id in enumerate(enabled_project_ids):
detect_llm_issues_for_project.apply_async(
args=[project_id],
countdown=index * 90,
current_slot = _get_current_dispatch_slot()
dispatched = 0

for org in RangeQuerySetWrapper(
Organization.objects.filter(status=OrganizationStatus.ACTIVE),
):
if dispatched >= MAX_ORGS_PER_CYCLE:
break

if (
not _org_in_slot(org.id, current_slot)
or not features.has("organizations:ai-issue-detection", org)
or not features.has("organizations:gen-ai-features", org)
or org.get_option("sentry:hide_ai_features")
):
continue

detect_llm_issues_for_org.apply_async(
args=[org.id],
countdown=dispatched * ORG_DISPATCH_STAGGER_SECONDS,
headers={"sentry-propagate-traces": False},
)
dispatched += 1

sentry_sdk.metrics.count(
"llm_issue_detection.orgs_dispatched",
dispatched,
attributes={"slot": current_slot},
)


@instrumented_task(
name="sentry.tasks.llm_issue_detection.detect_llm_issues_for_project",
name="sentry.tasks.llm_issue_detection.detect_llm_issues_for_org",
namespace=issues_tasks,
processing_deadline_duration=180, # 3 minutes
)
def detect_llm_issues_for_project(project_id: int) -> None:
def detect_llm_issues_for_org(org_id: int) -> None:
"""
Process a single project for LLM issue detection.
Process a single organization for LLM issue detection.

Gets the project's top TRANSACTION_BATCH_SIZE transaction spans from the last START_TIME_DELTA_MINUTES, sorted by -sum(span.duration).
From those transactions, dedupes on normalized transaction_name.
For each deduped transaction, gets first trace_id from the start of time window, which has small random variation.
Sends these trace_ids to seer, which uses get_trace_waterfall to construct an EAPTrace to analyze.
Picks one random active project, selects 1 trace, and sends to Seer.
Budget enforcement happens on the Seer side.
"""
from sentry.tasks.llm_issue_detection.trace_data import ( # circular imports
get_project_top_transaction_traces_for_llm_detection,
)

project = Project.objects.get_from_cache(id=project_id)
organization = project.organization
organization_id = organization.id
organization_slug = organization.slug
try:
organization = Organization.objects.get_from_cache(id=org_id)
except Organization.DoesNotExist:
return

has_access = features.has("organizations:gen-ai-features", organization) and not bool(
organization.get_option("sentry:hide_ai_features")
)
if not has_access:
return

projects = list(
Project.objects.filter(
organization_id=org_id,
status=ObjectStatus.ACTIVE,
).values_list("id", flat=True)
)
if not projects:
return

project_id = random.choice(projects)

project = Project.objects.get_from_cache(id=project_id)

Check warning on line 363 in src/sentry/tasks/llm_issue_detection/detection.py

View check run for this annotation

@sentry/warden / warden: sentry-backend-bugs

Project.DoesNotExist not handled in detect_llm_issues_for_org

The `Project.objects.get_from_cache(id=project_id)` call on line 363 does not handle `DoesNotExist`. While the project ID comes from a filter query moments earlier (lines 352-357), there is a race window where the project could be deleted or its status changed between the query and the cache lookup. In background tasks, unhandled DoesNotExist exceptions cause task failures and retries.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
perf_settings = project.get_option("sentry:performance_issue_settings", default={})
if not perf_settings.get("ai_issue_detection_enabled", True):
return
Expand All @@ -333,20 +371,17 @@
if not evidence_traces:
return

# Shuffle to randomize selection
random.shuffle(evidence_traces)

# Bulk check which traces are already processed
all_trace_ids = [t.trace_id for t in evidence_traces]
unprocessed_ids = _get_unprocessed_traces(all_trace_ids)
skipped = len(all_trace_ids) - len(unprocessed_ids)
if skipped:
sentry_sdk.metrics.count("llm_issue_detection.trace.skipped", skipped)

# Take up to NUM_TRANSACTIONS_TO_PROCESS
traces_to_send: list[TraceMetadataWithSpanCount] = [
t for t in evidence_traces if t.trace_id in unprocessed_ids
][:NUM_TRANSACTIONS_TO_PROCESS]
][:1]

if not traces_to_send:
return
Expand All @@ -359,12 +394,12 @@

seer_request = IssueDetectionRequest(
traces=traces_to_send,
organization_id=organization_id,
organization_id=org_id,
project_id=project_id,
org_slug=organization_slug,
org_slug=organization.slug,
)

viewer_context = SeerViewerContext(organization_id=organization_id)
viewer_context = SeerViewerContext(organization_id=org_id)
response = make_issue_detection_request(
seer_request,
timeout=SEER_TIMEOUT_S,
Expand All @@ -374,25 +409,14 @@

if response.status == 202:
mark_traces_as_processed([trace.trace_id for trace in traces_to_send])

logger.info(
"llm_issue_detection.request_accepted",
extra={
"project_id": project_id,
"organization_id": organization_id,
"trace_count": len(traces_to_send),
},
)
return

# Log (+ send to sentry) unexpected responses
logger.error(
"llm_issue_detection.unexpected_response",
extra={
"status_code": response.status,
"response_data": response.data,
"project_id": project_id,
"organization_id": organization_id,
"trace_count": len(traces_to_send),
"organization_id": org_id,
},
)
Loading
Loading