Skip to content

Commit ab46a65

Browse files
committed
feat(pipeline): Add OrganizationPipelineEndpoint for API-driven pipelines
Add a new REST endpoint that allows driving integration pipelines via JSON API requests instead of the legacy redirect-based flow. The endpoint supports initializing a pipeline (POST with action=initialize), retrieving the current step info (GET), and advancing through steps (POST with step-specific data). Uses ControlSiloOrganizationEndpoint as the base class since integration models live in the control silo, requiring the RPC service layer to resolve organizations from slugs. The endpoint rejects pipelines that don't support API mode with a 400, and returns structured errors for invalid providers, missing sessions, and unsupported pipeline names. Refs VDY-36
1 parent 36f6e8a commit ab46a65

File tree

3 files changed

+398
-0
lines changed

3 files changed

+398
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
5+
from rest_framework.request import Request
6+
from rest_framework.response import Response
7+
8+
from sentry.api.api_owners import ApiOwner
9+
from sentry.api.api_publish_status import ApiPublishStatus
10+
from sentry.api.base import control_silo_endpoint
11+
from sentry.api.bases.organization import (
12+
ControlSiloOrganizationEndpoint,
13+
OrganizationPermission,
14+
)
15+
from sentry.exceptions import NotRegistered
16+
from sentry.identity.pipeline import IdentityPipeline
17+
from sentry.integrations.pipeline import (
18+
IntegrationPipeline,
19+
IntegrationPipelineError,
20+
initialize_integration_pipeline,
21+
)
22+
from sentry.organizations.services.organization.model import RpcOrganization
23+
from sentry.pipeline.base import Pipeline
24+
from sentry.pipeline.types import PipelineStepAction
25+
26+
logger = logging.getLogger(__name__)
27+
28+
# All pipeline classes that can be driven via the API. The endpoint tries each
29+
# in order and uses whichever one has a valid session for the request.
30+
PIPELINE_CLASSES = (IntegrationPipeline, IdentityPipeline)
31+
32+
33+
class PipelinePermission(OrganizationPermission):
34+
scope_map = {
35+
"GET": ["org:read", "org:write", "org:admin", "org:integrations"],
36+
"POST": ["org:write", "org:admin", "org:integrations"],
37+
}
38+
39+
40+
def _get_api_pipeline(
41+
request: Request, organization: RpcOrganization, pipeline_name: str
42+
) -> Response | Pipeline:
43+
"""Look up an active API-ready pipeline from the session, or return an error Response."""
44+
try:
45+
pipeline_cls = next(cls for cls in PIPELINE_CLASSES if cls.pipeline_name == pipeline_name)
46+
except StopIteration:
47+
return Response({"detail": "Invalid pipeline type"}, status=404)
48+
49+
pipeline = pipeline_cls.get_for_request(request._request)
50+
if not pipeline or not pipeline.organization:
51+
return Response({"detail": "No active pipeline session."}, status=404)
52+
53+
if not pipeline.is_valid() or pipeline.organization.id != organization.id:
54+
return Response({"detail": "Invalid pipeline state."}, status=404)
55+
56+
if not pipeline.is_api_ready():
57+
return Response({"detail": "Pipeline does not support API mode."}, status=400)
58+
59+
return pipeline
60+
61+
62+
@control_silo_endpoint
63+
class OrganizationPipelineEndpoint(ControlSiloOrganizationEndpoint):
64+
owner = ApiOwner.ENTERPRISE
65+
publish_status = {
66+
"GET": ApiPublishStatus.EXPERIMENTAL,
67+
"POST": ApiPublishStatus.EXPERIMENTAL,
68+
}
69+
permission_classes = (PipelinePermission,)
70+
71+
def get(
72+
self, request: Request, organization: RpcOrganization, pipeline_name: str, **kwargs: object
73+
) -> Response:
74+
result = _get_api_pipeline(request, organization, pipeline_name)
75+
if isinstance(result, Response):
76+
return result
77+
return Response(result.get_current_step_info())
78+
79+
def post(
80+
self, request: Request, organization: RpcOrganization, pipeline_name: str, **kwargs: object
81+
) -> Response:
82+
if request.data.get("action") == "initialize":
83+
return self._initialize_pipeline(request, organization, pipeline_name)
84+
85+
result = _get_api_pipeline(request, organization, pipeline_name)
86+
if isinstance(result, Response):
87+
return result
88+
pipeline = result
89+
90+
step_result = pipeline.api_advance(request._request, request.data)
91+
92+
response_data = step_result.serialize()
93+
if step_result.action == PipelineStepAction.ADVANCE:
94+
response_data.update(pipeline.get_current_step_info())
95+
96+
if step_result.action == PipelineStepAction.ERROR:
97+
return Response(response_data, status=400)
98+
99+
return Response(response_data)
100+
101+
def _initialize_pipeline(
102+
self, request: Request, organization: RpcOrganization, pipeline_name: str
103+
) -> Response:
104+
if pipeline_name != IntegrationPipeline.pipeline_name:
105+
return Response(
106+
{"detail": "Initialization not supported for this pipeline."}, status=400
107+
)
108+
109+
provider_id = request.data.get("provider")
110+
if not provider_id:
111+
return Response({"detail": "provider is required."}, status=400)
112+
113+
try:
114+
pipeline = initialize_integration_pipeline(request._request, organization, provider_id)
115+
except NotRegistered:
116+
return Response({"detail": f"Unknown provider: {provider_id}"}, status=404)
117+
except IntegrationPipelineError as e:
118+
return Response({"detail": str(e)}, status=404 if e.not_found else 400)
119+
120+
if not pipeline.is_api_ready():
121+
return Response({"detail": "Pipeline does not support API mode."}, status=400)
122+
123+
pipeline.set_api_mode()
124+
125+
return Response(pipeline.get_current_step_info())

src/sentry/api/urls.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from sentry.api.endpoints.organization_insights_tree import OrganizationInsightsTreeEndpoint
2020
from sentry.api.endpoints.organization_intercom_jwt import OrganizationIntercomJwtEndpoint
2121
from sentry.api.endpoints.organization_missing_org_members import OrganizationMissingMembersEndpoint
22+
from sentry.api.endpoints.organization_pipeline import OrganizationPipelineEndpoint
2223
from sentry.api.endpoints.organization_plugin_deprecation_info import (
2324
OrganizationPluginDeprecationInfoEndpoint,
2425
)
@@ -2038,6 +2039,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
20382039
ExternalUserDetailsEndpoint.as_view(),
20392040
name="sentry-api-0-organization-external-user-details",
20402041
),
2042+
re_path(
2043+
r"^(?P<organization_id_or_slug>[^/]+)/pipeline/(?P<pipeline_name>[^/]+)/$",
2044+
OrganizationPipelineEndpoint.as_view(),
2045+
name="sentry-api-0-organization-pipeline",
2046+
),
20412047
re_path(
20422048
r"^(?P<organization_id_or_slug>[^/]+)/integration-requests/$",
20432049
OrganizationIntegrationRequestEndpoint.as_view(),

0 commit comments

Comments
 (0)