Skip to content

Commit 95f784e

Browse files
committed
feat(pipeline): Add OrganizationPipelineEndpoint for API-driven pipelines
Add a new REST endpoint that allows driving integration pipelines via JSON API requests instead of the legacy redirect-based flow. The endpoint supports initializing a pipeline (POST with action=initialize), retrieving the current step info (GET), and advancing through steps (POST with step-specific data). Uses ControlSiloOrganizationEndpoint as the base class since integration models live in the control silo, requiring the RPC service layer to resolve organizations from slugs. The endpoint rejects pipelines that don't support API mode with a 400, and returns structured errors for invalid providers, missing sessions, and unsupported pipeline names. Refs VDY-36
1 parent 876b9a5 commit 95f784e

File tree

4 files changed

+405
-0
lines changed

4 files changed

+405
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
5+
from rest_framework.request import Request
6+
from rest_framework.response import Response
7+
8+
from sentry.api.api_owners import ApiOwner
9+
from sentry.api.api_publish_status import ApiPublishStatus
10+
from sentry.api.base import control_silo_endpoint
11+
from sentry.api.bases.organization import (
12+
ControlSiloOrganizationEndpoint,
13+
OrganizationPermission,
14+
)
15+
from sentry.exceptions import NotRegistered
16+
from sentry.identity.pipeline import IdentityPipeline
17+
from sentry.integrations.pipeline import (
18+
IntegrationPipeline,
19+
IntegrationPipelineError,
20+
initialize_integration_pipeline,
21+
)
22+
from sentry.organizations.services.organization.model import RpcOrganization
23+
from sentry.pipeline.base import Pipeline
24+
from sentry.pipeline.types import PipelineStepAction
25+
26+
logger = logging.getLogger(__name__)
27+
28+
# All pipeline classes that can be driven via the API. The endpoint tries each
29+
# in order and uses whichever one has a valid session for the request.
30+
PIPELINE_CLASSES = (IntegrationPipeline, IdentityPipeline)
31+
32+
33+
class PipelinePermission(OrganizationPermission):
34+
scope_map = {
35+
"GET": ["org:read", "org:write", "org:admin", "org:integrations"],
36+
"POST": ["org:write", "org:admin", "org:integrations"],
37+
}
38+
39+
40+
def _get_api_pipeline(
41+
request: Request, organization: RpcOrganization, pipeline_name: str
42+
) -> Response | Pipeline:
43+
"""Look up an active API-ready pipeline from the session, or return an error Response."""
44+
pipelines = {cls.pipeline_name: cls for cls in PIPELINE_CLASSES}
45+
if pipeline_name not in pipelines:
46+
return Response({"detail": "Invalid pipeline type"}, status=404)
47+
48+
pipeline = pipelines[pipeline_name].get_for_request(request._request)
49+
if not pipeline or not pipeline.organization:
50+
return Response({"detail": "No active pipeline session."}, status=404)
51+
52+
if not pipeline.is_valid() or pipeline.organization.id != organization.id:
53+
return Response({"detail": "Invalid pipeline state."}, status=404)
54+
55+
if not pipeline.is_api_ready():
56+
return Response({"detail": "Pipeline does not support API mode."}, status=400)
57+
58+
return pipeline
59+
60+
61+
@control_silo_endpoint
62+
class OrganizationPipelineEndpoint(ControlSiloOrganizationEndpoint):
63+
owner = ApiOwner.ENTERPRISE
64+
publish_status = {
65+
"GET": ApiPublishStatus.EXPERIMENTAL,
66+
"POST": ApiPublishStatus.EXPERIMENTAL,
67+
}
68+
permission_classes = (PipelinePermission,)
69+
70+
def get(
71+
self, request: Request, organization: RpcOrganization, pipeline_name: str, **kwargs: object
72+
) -> Response:
73+
result = _get_api_pipeline(request, organization, pipeline_name)
74+
if isinstance(result, Response):
75+
return result
76+
return Response(result.get_current_step_info())
77+
78+
def post(
79+
self, request: Request, organization: RpcOrganization, pipeline_name: str, **kwargs: object
80+
) -> Response:
81+
if request.data.get("action") == "initialize":
82+
return self._initialize_pipeline(request, organization, pipeline_name)
83+
84+
result = _get_api_pipeline(request, organization, pipeline_name)
85+
if isinstance(result, Response):
86+
return result
87+
pipeline = result
88+
89+
step_result = pipeline.api_advance(request._request, request.data)
90+
91+
response_data = step_result.serialize()
92+
if step_result.action == PipelineStepAction.ADVANCE:
93+
response_data.update(pipeline.get_current_step_info())
94+
95+
if step_result.action == PipelineStepAction.ERROR:
96+
return Response(response_data, status=400)
97+
98+
return Response(response_data)
99+
100+
def _initialize_pipeline(
101+
self, request: Request, organization: RpcOrganization, pipeline_name: str
102+
) -> Response:
103+
if pipeline_name != IntegrationPipeline.pipeline_name:
104+
return Response(
105+
{"detail": "Initialization not supported for this pipeline."}, status=400
106+
)
107+
108+
provider_id = request.data.get("provider")
109+
if not provider_id:
110+
return Response({"detail": "provider is required."}, status=400)
111+
112+
try:
113+
pipeline = initialize_integration_pipeline(request._request, organization, provider_id)
114+
except NotRegistered:
115+
return Response({"detail": f"Unknown provider: {provider_id}"}, status=404)
116+
except IntegrationPipelineError as e:
117+
return Response({"detail": str(e)}, status=404 if e.not_found else 400)
118+
119+
if not pipeline.is_api_ready():
120+
return Response({"detail": "Pipeline does not support API mode."}, status=400)
121+
122+
pipeline.set_api_mode()
123+
124+
return Response(pipeline.get_current_step_info())

src/sentry/api/urls.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from sentry.api.endpoints.organization_insights_tree import OrganizationInsightsTreeEndpoint
2020
from sentry.api.endpoints.organization_intercom_jwt import OrganizationIntercomJwtEndpoint
2121
from sentry.api.endpoints.organization_missing_org_members import OrganizationMissingMembersEndpoint
22+
from sentry.api.endpoints.organization_pipeline import OrganizationPipelineEndpoint
2223
from sentry.api.endpoints.organization_plugin_deprecation_info import (
2324
OrganizationPluginDeprecationInfoEndpoint,
2425
)
@@ -2038,6 +2039,11 @@ def create_group_urls(name_prefix: str) -> list[URLPattern | URLResolver]:
20382039
ExternalUserDetailsEndpoint.as_view(),
20392040
name="sentry-api-0-organization-external-user-details",
20402041
),
2042+
re_path(
2043+
r"^(?P<organization_id_or_slug>[^/]+)/pipeline/(?P<pipeline_name>[^/]+)/$",
2044+
OrganizationPipelineEndpoint.as_view(),
2045+
name="sentry-api-0-organization-pipeline",
2046+
),
20412047
re_path(
20422048
r"^(?P<organization_id_or_slug>[^/]+)/integration-requests/$",
20432049
OrganizationIntegrationRequestEndpoint.as_view(),

src/sentry/pipeline/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,14 @@ def is_api_ready(self) -> bool:
275275
"""Returns True if this pipeline supports API mode."""
276276
return self.get_pipeline_api_steps() is not None
277277

278+
@property
279+
def is_api_mode(self) -> bool:
280+
"""Returns True if this pipeline session was initiated via the API."""
281+
return bool(self._fetch_state("api_mode"))
282+
283+
def set_api_mode(self, enabled: bool = True) -> None:
284+
self.bind_state("api_mode", enabled)
285+
278286
def _assert_user_authorization(self) -> None:
279287
assert not (self.state.uid is not None and self.state.uid != self.request.user.id), (
280288
ERR_MISMATCHED_USER

0 commit comments

Comments
 (0)