Skip to content

Commit 39b1ebf

Browse files
ref(preprod): Remove Kafka producer and taskbroker feature flag (#112593)
Taskbroker is now the only dispatch path for preprod artifacts. This PR removes the old Kafka producer path and all associated dead code. **What's removed:** - `src/sentry/preprod/producer.py` — entire module (was already fully commented out) - `launchpad-taskbroker-rollout` feature flag registration - `PREPROD_ARTIFACT_EVENTS` Kafka topic definition and cluster mapping - `SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT` settings constant - All `produce_preprod_artifact_to_kafka` references in tests **What's simplified:** - `PreprodFeature` enum moved from deleted `producer.py` to `quotas.py` - Rerun analysis endpoint now checks `run_size` directly instead of building an intermediate `requested_features` list - `cleanup_old_metrics` uses list comprehension for file ID collection **Tests updated:** - Replaced kafka/flag-gated dispatch tests with direct `dispatch_taskbroker` assertions - Replaced `produce_preprod_artifact_to_kafka` mock tests with `cleanup_old_metrics` mock tests that verify quota-based cleanup behavior Co-authored-by: Claude <noreply@anthropic.com>
1 parent 2408ecd commit 39b1ebf

File tree

10 files changed

+38
-469
lines changed

10 files changed

+38
-469
lines changed

src/sentry/conf/server.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2692,7 +2692,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
26922692
"shared-resources-usage": "default",
26932693
"buffered-segments": "default",
26942694
"buffered-segments-dlq": "default",
2695-
"preprod-artifact-events": "default",
26962695
# Taskworker topics
26972696
"taskworker": "default",
26982697
"taskworker-dlq": "default",
@@ -3027,8 +3026,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
30273026
SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000
30283027
SENTRY_PROFILE_EAP_FUTURES_MAX_LIMIT = 10000
30293028

3030-
SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000
3031-
30323029
# How long we should wait for a gateway proxy request to return before giving up
30333030
GATEWAY_PROXY_TIMEOUT: int | None = (
30343031
int(os.environ["SENTRY_APIGW_PROXY_TIMEOUT"])

src/sentry/conf/types/kafka_definition.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class Topic(Enum):
5353
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
5454
INGEST_OCCURRENCES = "ingest-occurrences"
5555
INGEST_MONITORS = "ingest-monitors"
56-
PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events"
5756
MONITORS_CLOCK_TICK = "monitors-clock-tick"
5857
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
5958
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"

src/sentry/features/temporary.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ def register_temporary_features(manager: FeatureManager) -> None:
157157
manager.add("organizations:invite-members", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=True)
158158
# Enable rate limits for inviting members.
159159
manager.add("organizations:invite-members-rate-limits", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=False)
160-
# Enable rollout of launchpad taskbroker shadowing/usage
161-
manager.add("organizations:launchpad-taskbroker-rollout", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
162160
manager.add("organizations:mep-use-default-tags", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
163161
# Enable flamegraph visualization for MetricKit hang diagnostic stack traces
164162
manager.add("organizations:metrickit-flamegraph", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)

src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py

Lines changed: 11 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from rest_framework.request import Request
99
from rest_framework.response import Response
1010

11-
from sentry import analytics, features
11+
from sentry import analytics
1212
from sentry.api.api_owners import ApiOwner
1313
from sentry.api.api_publish_status import ApiPublishStatus
1414
from sentry.api.base import Endpoint, cell_silo_endpoint, internal_cell_silo_endpoint
@@ -22,8 +22,7 @@
2222
PreprodArtifactSizeComparison,
2323
PreprodArtifactSizeMetrics,
2424
)
25-
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
26-
from sentry.preprod.quotas import should_run_distribution, should_run_size
25+
from sentry.preprod.quotas import should_run_size
2726
from sentry.preprod.tasks import dispatch_taskbroker
2827

2928
logger = logging.getLogger(__name__)
@@ -65,47 +64,12 @@ def post(
6564

6665
organization = head_artifact.project.organization
6766

68-
# Empty list is valid - triggers default processing behavior
69-
requested_features: list[PreprodFeature] = []
70-
7167
run_size, _ = should_run_size(head_artifact, actor=request.user)
7268
if run_size:
73-
requested_features.append(PreprodFeature.SIZE_ANALYSIS)
74-
75-
run_distribution, _ = should_run_distribution(head_artifact, actor=request.user)
76-
if run_distribution:
77-
requested_features.append(PreprodFeature.BUILD_DISTRIBUTION)
78-
79-
if PreprodFeature.SIZE_ANALYSIS in requested_features:
8069
cleanup_old_metrics(head_artifact)
8170
reset_artifact_data(head_artifact)
8271

83-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
84-
dispatched = dispatch_taskbroker(
85-
head_artifact.project.id, organization.id, head_artifact_id
86-
)
87-
else:
88-
try:
89-
produce_preprod_artifact_to_kafka(
90-
project_id=head_artifact.project.id,
91-
organization_id=organization.id,
92-
artifact_id=head_artifact_id,
93-
requested_features=requested_features,
94-
)
95-
dispatched = True
96-
except Exception:
97-
logger.exception(
98-
"preprod_artifact.rerun_analysis.dispatch_error",
99-
extra={
100-
"artifact_id": head_artifact_id,
101-
"user_id": request.user.id,
102-
"organization_id": organization.id,
103-
"project_id": head_artifact.project.id,
104-
},
105-
)
106-
dispatched = False
107-
108-
if not dispatched:
72+
if not dispatch_taskbroker(head_artifact.project.id, organization.id, head_artifact_id):
10973
return Response(
11074
{
11175
"detail": f"Failed to queue analysis for artifact {head_artifact_id}",
@@ -182,36 +146,9 @@ def post(self, request: Request) -> Response:
182146
reset_artifact_data(preprod_artifact)
183147

184148
organization = preprod_artifact.project.organization
185-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
186-
dispatched = dispatch_taskbroker(
187-
preprod_artifact.project.id, organization.id, preprod_artifact_id
188-
)
189-
else:
190-
try:
191-
produce_preprod_artifact_to_kafka(
192-
project_id=preprod_artifact.project.id,
193-
organization_id=organization.id,
194-
artifact_id=preprod_artifact_id,
195-
requested_features=[
196-
PreprodFeature.SIZE_ANALYSIS,
197-
PreprodFeature.BUILD_DISTRIBUTION,
198-
],
199-
)
200-
dispatched = True
201-
except Exception as e:
202-
logger.exception(
203-
"preprod_artifact.admin_rerun_analysis.dispatch_error",
204-
extra={
205-
"artifact_id": preprod_artifact_id,
206-
"user_id": request.user.id,
207-
"organization_id": organization.id,
208-
"project_id": preprod_artifact.project.id,
209-
"error": str(e),
210-
},
211-
)
212-
dispatched = False
213-
214-
if not dispatched:
149+
if not dispatch_taskbroker(
150+
preprod_artifact.project.id, organization.id, preprod_artifact_id
151+
):
215152
return Response(
216153
{
217154
"detail": f"Failed to queue analysis for artifact {preprod_artifact_id}",
@@ -303,32 +240,7 @@ def post(self, request: Request) -> Response:
303240
cleanup_stats = cleanup_old_metrics(artifact)
304241
reset_artifact_data(artifact)
305242

306-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
307-
dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
308-
else:
309-
try:
310-
produce_preprod_artifact_to_kafka(
311-
project_id=artifact.project.id,
312-
organization_id=organization.id,
313-
artifact_id=artifact_id,
314-
requested_features=[
315-
PreprodFeature.SIZE_ANALYSIS,
316-
PreprodFeature.BUILD_DISTRIBUTION,
317-
],
318-
)
319-
dispatched = True
320-
except Exception:
321-
logger.exception(
322-
"preprod_artifact.admin_batch_rerun_analysis.dispatch_error",
323-
extra={
324-
"artifact_id": artifact_id,
325-
"user_id": request.user.id,
326-
"organization_id": organization.id,
327-
"project_id": artifact.project.id,
328-
},
329-
)
330-
dispatched = False
331-
243+
dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
332244
if not dispatched:
333245
artifact.refresh_from_db()
334246

@@ -367,14 +279,10 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
367279
PreprodArtifactSizeMetrics.objects.filter(preprod_artifact=preprod_artifact)
368280
)
369281

370-
file_ids_to_delete = []
371-
372282
if size_metrics:
373283
size_metric_ids = [sm.id for sm in size_metrics]
374284

375-
for size_metric in size_metrics:
376-
if size_metric.analysis_file_id:
377-
file_ids_to_delete.append(size_metric.analysis_file_id)
285+
file_ids_to_delete = [sm.analysis_file_id for sm in size_metrics if sm.analysis_file_id]
378286

379287
comparisons = PreprodArtifactSizeComparison.objects.filter(
380288
head_size_analysis_id__in=size_metric_ids
@@ -392,10 +300,9 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
392300
id__in=size_metric_ids
393301
).delete()
394302

395-
if file_ids_to_delete:
396-
for file in File.objects.filter(id__in=file_ids_to_delete):
397-
file.delete()
398-
stats.files_total_deleted += 1
303+
for file in File.objects.filter(id__in=file_ids_to_delete):
304+
file.delete()
305+
stats.files_total_deleted += 1
399306

400307
PreprodArtifactSizeMetrics.objects.create(
401308
preprod_artifact=preprod_artifact,

src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
PreprodArtifactMobileAppInfo,
2727
PreprodArtifactSizeMetrics,
2828
)
29-
from sentry.preprod.producer import PreprodFeature
30-
from sentry.preprod.quotas import should_run_distribution, should_run_size
29+
from sentry.preprod.quotas import PreprodFeature, should_run_distribution, should_run_size
3130
from sentry.preprod.vcs.status_checks.size.tasks import create_preprod_status_check_task
3231

3332
logger = logging.getLogger(__name__)

src/sentry/preprod/producer.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

src/sentry/preprod/quotas.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
from collections.abc import Callable
55
from datetime import datetime, timedelta
6+
from enum import Enum
67
from typing import Any
78

89
import sentry_sdk
@@ -15,11 +16,16 @@
1516
from sentry.models.organization import Organization
1617
from sentry.preprod.artifact_search import artifact_matches_query
1718
from sentry.preprod.models import PreprodArtifact
18-
from sentry.preprod.producer import PreprodFeature
1919
from sentry.users.models.user import User
2020

2121
logger = logging.getLogger(__name__)
2222

23+
24+
class PreprodFeature(Enum):
25+
SIZE_ANALYSIS = "size_analysis"
26+
BUILD_DISTRIBUTION = "build_distribution"
27+
28+
2329
DEFAULT_SIZE_RETENTION_DAYS = 90
2430

2531

src/sentry/preprod/tasks.py

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from django.utils import timezone
1212
from taskbroker_client.retry import Retry
1313

14-
from sentry import features
1514
from sentry.constants import DataCategory
1615
from sentry.models.commitcomparison import CommitComparison
1716
from sentry.models.organization import Organization
@@ -28,7 +27,6 @@
2827
PreprodArtifactSizeMetrics,
2928
PreprodBuildConfiguration,
3029
)
31-
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
3230
from sentry.preprod.quotas import (
3331
has_installable_quota,
3432
has_size_quota,
@@ -165,14 +163,9 @@ def assemble_preprod_artifact(
165163
except Exception:
166164
pass
167165

168-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
169-
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
170-
if not taskbroker_dispatched:
171-
return
172-
else:
173-
kafka_dispatched = _dispatch_kafka(project_id, org_id, artifact_id, checksum)
174-
if not kafka_dispatched:
175-
return
166+
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
167+
if not taskbroker_dispatched:
168+
return
176169

177170
logger.info(
178171
"Finished preprod artifact dispatch",
@@ -976,47 +969,6 @@ def detect_expired_preprod_artifacts() -> None:
976969
)
977970

978971

979-
def _dispatch_kafka(project_id: int, org_id: int, artifact_id: int, checksum: str) -> bool:
980-
# Note: requested_features is no longer used for filtering - all features are
981-
# requested here, and the actual quota/filter checks happen in the update endpoint
982-
# (project_preprod_artifact_update.py) after preprocessing completes.
983-
try:
984-
produce_preprod_artifact_to_kafka(
985-
project_id=project_id,
986-
organization_id=org_id,
987-
artifact_id=artifact_id,
988-
requested_features=[
989-
PreprodFeature.SIZE_ANALYSIS,
990-
PreprodFeature.BUILD_DISTRIBUTION,
991-
],
992-
)
993-
return True
994-
except Exception as e:
995-
user_friendly_error_message = "Failed to dispatch preprod artifact event for analysis"
996-
sentry_sdk.capture_exception(e)
997-
logger.exception(
998-
user_friendly_error_message,
999-
extra={
1000-
"project_id": project_id,
1001-
"organization_id": org_id,
1002-
"checksum": checksum,
1003-
"preprod_artifact_id": artifact_id,
1004-
},
1005-
)
1006-
PreprodArtifact.objects.filter(id=artifact_id).update(
1007-
state=PreprodArtifact.ArtifactState.FAILED,
1008-
error_code=PreprodArtifact.ErrorCode.ARTIFACT_PROCESSING_ERROR,
1009-
error_message=user_friendly_error_message,
1010-
)
1011-
create_preprod_status_check_task.apply_async(
1012-
kwargs={
1013-
"preprod_artifact_id": artifact_id,
1014-
"caller": "assemble_dispatch_error",
1015-
}
1016-
)
1017-
return False
1018-
1019-
1020972
def dispatch_taskbroker(project_id: int, org_id: int, artifact_id: int) -> bool:
1021973
try:
1022974
logger.info(

0 commit comments

Comments
 (0)