Skip to content

Commit 2b337ce

Browse files
ref(preprod): Remove Kafka producer and taskbroker feature flag
Taskbroker is now the only dispatch path for preprod artifacts. Remove the Kafka producer module, the launchpad-taskbroker-rollout feature flag, and all associated dead code including the orphaned Kafka topic definition and settings constant. Move PreprodFeature enum from the deleted producer module to quotas where it is actually used. Simplify the rerun analysis endpoint to check run_size directly instead of building an intermediate list. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent b61fe04 commit 2b337ce

File tree

10 files changed

+38
-469
lines changed

10 files changed

+38
-469
lines changed

src/sentry/conf/server.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2691,7 +2691,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
26912691
"shared-resources-usage": "default",
26922692
"buffered-segments": "default",
26932693
"buffered-segments-dlq": "default",
2694-
"preprod-artifact-events": "default",
26952694
# Taskworker topics
26962695
"taskworker": "default",
26972696
"taskworker-dlq": "default",
@@ -3026,8 +3025,6 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
30263025
SENTRY_PROFILE_OCCURRENCES_FUTURES_MAX_LIMIT = 10000
30273026
SENTRY_PROFILE_EAP_FUTURES_MAX_LIMIT = 10000
30283027

3029-
SENTRY_PREPROD_ARTIFACT_EVENTS_FUTURES_MAX_LIMIT = 10000
3030-
30313028
# How long we should wait for a gateway proxy request to return before giving up
30323029
GATEWAY_PROXY_TIMEOUT: int | None = (
30333030
int(os.environ["SENTRY_APIGW_PROXY_TIMEOUT"])

src/sentry/conf/types/kafka_definition.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class Topic(Enum):
5353
INGEST_REPLAYS_RECORDINGS = "ingest-replay-recordings"
5454
INGEST_OCCURRENCES = "ingest-occurrences"
5555
INGEST_MONITORS = "ingest-monitors"
56-
PREPROD_ARTIFACT_EVENTS = "preprod-artifact-events"
5756
MONITORS_CLOCK_TICK = "monitors-clock-tick"
5857
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
5958
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"

src/sentry/features/temporary.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ def register_temporary_features(manager: FeatureManager) -> None:
157157
manager.add("organizations:invite-members", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=True)
158158
# Enable rate limits for inviting members.
159159
manager.add("organizations:invite-members-rate-limits", OrganizationFeature, FeatureHandlerStrategy.INTERNAL, default=True, api_expose=False)
160-
# Enable rollout of launchpad taskbroker shadowing/usage
161-
manager.add("organizations:launchpad-taskbroker-rollout", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)
162160
manager.add("organizations:mep-use-default-tags", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)
163161
# Enable flamegraph visualization for MetricKit hang diagnostic stack traces
164162
manager.add("organizations:metrickit-flamegraph", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=True)

src/sentry/preprod/api/endpoints/preprod_artifact_rerun_analysis.py

Lines changed: 11 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from rest_framework.request import Request
99
from rest_framework.response import Response
1010

11-
from sentry import analytics, features
11+
from sentry import analytics
1212
from sentry.api.api_owners import ApiOwner
1313
from sentry.api.api_publish_status import ApiPublishStatus
1414
from sentry.api.base import Endpoint, cell_silo_endpoint, internal_cell_silo_endpoint
@@ -22,8 +22,7 @@
2222
PreprodArtifactSizeComparison,
2323
PreprodArtifactSizeMetrics,
2424
)
25-
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
26-
from sentry.preprod.quotas import should_run_distribution, should_run_size
25+
from sentry.preprod.quotas import should_run_size
2726
from sentry.preprod.tasks import dispatch_taskbroker
2827

2928
logger = logging.getLogger(__name__)
@@ -65,47 +64,12 @@ def post(
6564

6665
organization = head_artifact.project.organization
6766

68-
# Empty list is valid - triggers default processing behavior
69-
requested_features: list[PreprodFeature] = []
70-
7167
run_size, _ = should_run_size(head_artifact, actor=request.user)
7268
if run_size:
73-
requested_features.append(PreprodFeature.SIZE_ANALYSIS)
74-
75-
run_distribution, _ = should_run_distribution(head_artifact, actor=request.user)
76-
if run_distribution:
77-
requested_features.append(PreprodFeature.BUILD_DISTRIBUTION)
78-
79-
if PreprodFeature.SIZE_ANALYSIS in requested_features:
8069
cleanup_old_metrics(head_artifact)
8170
reset_artifact_data(head_artifact)
8271

83-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
84-
dispatched = dispatch_taskbroker(
85-
head_artifact.project.id, organization.id, head_artifact_id
86-
)
87-
else:
88-
try:
89-
produce_preprod_artifact_to_kafka(
90-
project_id=head_artifact.project.id,
91-
organization_id=organization.id,
92-
artifact_id=head_artifact_id,
93-
requested_features=requested_features,
94-
)
95-
dispatched = True
96-
except Exception:
97-
logger.exception(
98-
"preprod_artifact.rerun_analysis.dispatch_error",
99-
extra={
100-
"artifact_id": head_artifact_id,
101-
"user_id": request.user.id,
102-
"organization_id": organization.id,
103-
"project_id": head_artifact.project.id,
104-
},
105-
)
106-
dispatched = False
107-
108-
if not dispatched:
72+
if not dispatch_taskbroker(head_artifact.project.id, organization.id, head_artifact_id):
10973
return Response(
11074
{
11175
"detail": f"Failed to queue analysis for artifact {head_artifact_id}",
@@ -182,36 +146,9 @@ def post(self, request: Request) -> Response:
182146
reset_artifact_data(preprod_artifact)
183147

184148
organization = preprod_artifact.project.organization
185-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
186-
dispatched = dispatch_taskbroker(
187-
preprod_artifact.project.id, organization.id, preprod_artifact_id
188-
)
189-
else:
190-
try:
191-
produce_preprod_artifact_to_kafka(
192-
project_id=preprod_artifact.project.id,
193-
organization_id=organization.id,
194-
artifact_id=preprod_artifact_id,
195-
requested_features=[
196-
PreprodFeature.SIZE_ANALYSIS,
197-
PreprodFeature.BUILD_DISTRIBUTION,
198-
],
199-
)
200-
dispatched = True
201-
except Exception as e:
202-
logger.exception(
203-
"preprod_artifact.admin_rerun_analysis.dispatch_error",
204-
extra={
205-
"artifact_id": preprod_artifact_id,
206-
"user_id": request.user.id,
207-
"organization_id": organization.id,
208-
"project_id": preprod_artifact.project.id,
209-
"error": str(e),
210-
},
211-
)
212-
dispatched = False
213-
214-
if not dispatched:
149+
if not dispatch_taskbroker(
150+
preprod_artifact.project.id, organization.id, preprod_artifact_id
151+
):
215152
return Response(
216153
{
217154
"detail": f"Failed to queue analysis for artifact {preprod_artifact_id}",
@@ -303,32 +240,7 @@ def post(self, request: Request) -> Response:
303240
cleanup_stats = cleanup_old_metrics(artifact)
304241
reset_artifact_data(artifact)
305242

306-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
307-
dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
308-
else:
309-
try:
310-
produce_preprod_artifact_to_kafka(
311-
project_id=artifact.project.id,
312-
organization_id=organization.id,
313-
artifact_id=artifact_id,
314-
requested_features=[
315-
PreprodFeature.SIZE_ANALYSIS,
316-
PreprodFeature.BUILD_DISTRIBUTION,
317-
],
318-
)
319-
dispatched = True
320-
except Exception:
321-
logger.exception(
322-
"preprod_artifact.admin_batch_rerun_analysis.dispatch_error",
323-
extra={
324-
"artifact_id": artifact_id,
325-
"user_id": request.user.id,
326-
"organization_id": organization.id,
327-
"project_id": artifact.project.id,
328-
},
329-
)
330-
dispatched = False
331-
243+
dispatched = dispatch_taskbroker(artifact.project.id, organization.id, artifact_id)
332244
if not dispatched:
333245
artifact.refresh_from_db()
334246

@@ -367,14 +279,10 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
367279
PreprodArtifactSizeMetrics.objects.filter(preprod_artifact=preprod_artifact)
368280
)
369281

370-
file_ids_to_delete = []
371-
372282
if size_metrics:
373283
size_metric_ids = [sm.id for sm in size_metrics]
374284

375-
for size_metric in size_metrics:
376-
if size_metric.analysis_file_id:
377-
file_ids_to_delete.append(size_metric.analysis_file_id)
285+
file_ids_to_delete = [sm.analysis_file_id for sm in size_metrics if sm.analysis_file_id]
378286

379287
comparisons = PreprodArtifactSizeComparison.objects.filter(
380288
head_size_analysis_id__in=size_metric_ids
@@ -392,10 +300,9 @@ def cleanup_old_metrics(preprod_artifact: PreprodArtifact) -> CleanupStats:
392300
id__in=size_metric_ids
393301
).delete()
394302

395-
if file_ids_to_delete:
396-
for file in File.objects.filter(id__in=file_ids_to_delete):
397-
file.delete()
398-
stats.files_total_deleted += 1
303+
for file in File.objects.filter(id__in=file_ids_to_delete):
304+
file.delete()
305+
stats.files_total_deleted += 1
399306

400307
PreprodArtifactSizeMetrics.objects.create(
401308
preprod_artifact=preprod_artifact,

src/sentry/preprod/api/endpoints/project_preprod_artifact_update.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
PreprodArtifactMobileAppInfo,
2727
PreprodArtifactSizeMetrics,
2828
)
29-
from sentry.preprod.producer import PreprodFeature
30-
from sentry.preprod.quotas import should_run_distribution, should_run_size
29+
from sentry.preprod.quotas import PreprodFeature, should_run_distribution, should_run_size
3130
from sentry.preprod.vcs.status_checks.size.tasks import create_preprod_status_check_task
3231

3332
logger = logging.getLogger(__name__)

src/sentry/preprod/producer.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

src/sentry/preprod/quotas.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
from collections.abc import Callable
55
from datetime import datetime, timedelta
6+
from enum import Enum
67
from typing import Any
78

89
import sentry_sdk
@@ -15,11 +16,16 @@
1516
from sentry.models.organization import Organization
1617
from sentry.preprod.artifact_search import artifact_matches_query
1718
from sentry.preprod.models import PreprodArtifact
18-
from sentry.preprod.producer import PreprodFeature
1919
from sentry.users.models.user import User
2020

2121
logger = logging.getLogger(__name__)
2222

23+
24+
class PreprodFeature(Enum):
25+
SIZE_ANALYSIS = "size_analysis"
26+
BUILD_DISTRIBUTION = "build_distribution"
27+
28+
2329
DEFAULT_SIZE_RETENTION_DAYS = 90
2430

2531

src/sentry/preprod/tasks.py

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from django.utils import timezone
1212
from taskbroker_client.retry import Retry
1313

14-
from sentry import features
1514
from sentry.constants import DataCategory
1615
from sentry.models.commitcomparison import CommitComparison
1716
from sentry.models.organization import Organization
@@ -28,7 +27,6 @@
2827
PreprodArtifactSizeMetrics,
2928
PreprodBuildConfiguration,
3029
)
31-
from sentry.preprod.producer import PreprodFeature, produce_preprod_artifact_to_kafka
3230
from sentry.preprod.quotas import (
3331
has_installable_quota,
3432
has_size_quota,
@@ -165,14 +163,9 @@ def assemble_preprod_artifact(
165163
except Exception:
166164
pass
167165

168-
if features.has("organizations:launchpad-taskbroker-rollout", organization):
169-
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
170-
if not taskbroker_dispatched:
171-
return
172-
else:
173-
kafka_dispatched = _dispatch_kafka(project_id, org_id, artifact_id, checksum)
174-
if not kafka_dispatched:
175-
return
166+
taskbroker_dispatched = dispatch_taskbroker(project_id, org_id, artifact_id)
167+
if not taskbroker_dispatched:
168+
return
176169

177170
logger.info(
178171
"Finished preprod artifact dispatch",
@@ -976,47 +969,6 @@ def detect_expired_preprod_artifacts() -> None:
976969
)
977970

978971

979-
def _dispatch_kafka(project_id: int, org_id: int, artifact_id: int, checksum: str) -> bool:
980-
# Note: requested_features is no longer used for filtering - all features are
981-
# requested here, and the actual quota/filter checks happen in the update endpoint
982-
# (project_preprod_artifact_update.py) after preprocessing completes.
983-
try:
984-
produce_preprod_artifact_to_kafka(
985-
project_id=project_id,
986-
organization_id=org_id,
987-
artifact_id=artifact_id,
988-
requested_features=[
989-
PreprodFeature.SIZE_ANALYSIS,
990-
PreprodFeature.BUILD_DISTRIBUTION,
991-
],
992-
)
993-
return True
994-
except Exception as e:
995-
user_friendly_error_message = "Failed to dispatch preprod artifact event for analysis"
996-
sentry_sdk.capture_exception(e)
997-
logger.exception(
998-
user_friendly_error_message,
999-
extra={
1000-
"project_id": project_id,
1001-
"organization_id": org_id,
1002-
"checksum": checksum,
1003-
"preprod_artifact_id": artifact_id,
1004-
},
1005-
)
1006-
PreprodArtifact.objects.filter(id=artifact_id).update(
1007-
state=PreprodArtifact.ArtifactState.FAILED,
1008-
error_code=PreprodArtifact.ErrorCode.ARTIFACT_PROCESSING_ERROR,
1009-
error_message=user_friendly_error_message,
1010-
)
1011-
create_preprod_status_check_task.apply_async(
1012-
kwargs={
1013-
"preprod_artifact_id": artifact_id,
1014-
"caller": "assemble_dispatch_error",
1015-
}
1016-
)
1017-
return False
1018-
1019-
1020972
def dispatch_taskbroker(project_id: int, org_id: int, artifact_id: int) -> bool:
1021973
try:
1022974
logger.info(

0 commit comments

Comments
 (0)