Skip to content

Commit 98f96d9

Browse files
committed
chore: move async apigateway in own module
1 parent 9301cda commit 98f96d9

File tree

16 files changed

+494
-129
lines changed

16 files changed

+494
-129
lines changed

src/sentry/conf/server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,12 @@ def env(
371371
# so that responses aren't modified after Content-Length is set, or have the
372372
# response modifying middleware reset the Content-Length header.
373373
# This is because CommonMiddleware Sets the Content-Length header for non-streaming responses.
374+
APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "").lower() in ("1", "true", "y", "yes")
375+
APIGW_MIDDLEWARE = (
376+
"sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware"
377+
if APIGW_ASYNC
378+
else "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware"
379+
)
374380
MIDDLEWARE: tuple[str, ...] = (
375381
"csp.middleware.CSPMiddleware",
376382
"sentry.middleware.health.HealthCheck",
@@ -387,7 +393,7 @@ def env(
387393
"sentry.middleware.auth.AuthenticationMiddleware",
388394
"sentry.middleware.ai_agent.AIAgentMiddleware",
389395
"sentry.middleware.integrations.IntegrationControlMiddleware",
390-
"sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware",
396+
APIGW_MIDDLEWARE,
391397
"sentry.middleware.demo_mode_guard.DemoModeGuardMiddleware",
392398
"sentry.middleware.customer_domain.CustomerDomainMiddleware",
393399
"sentry.middleware.sudo.SudoMiddleware",

src/sentry/hybridcloud/apigateway/apigateway.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset
3030
return endpoint_silo_limit.modes
3131

3232

33-
async def proxy_request_if_needed(
34-
request: Request,
35-
view_func: Callable[..., HttpResponseBase],
36-
view_kwargs: dict[str, Any],
33+
def proxy_request_if_needed(
34+
request: Request, view_func: Callable[..., HttpResponseBase], view_kwargs: dict[str, Any]
3735
) -> HttpResponseBase | None:
3836
"""
3937
Main execution flow for the API Gateway.
@@ -63,7 +61,7 @@ async def proxy_request_if_needed(
6361
"kind": "orgslug",
6462
},
6563
)
66-
return await proxy_request(request, org_id_or_slug, url_name)
64+
return proxy_request(request, org_id_or_slug, url_name)
6765

6866
if url_name == "sentry-error-page-embed" and "dsn" in request.GET:
6967
# Error embed modal is special as customers can't easily use cell URLs.
@@ -75,7 +73,7 @@ async def proxy_request_if_needed(
7573
"kind": "error-embed",
7674
},
7775
)
78-
return await proxy_error_embed_request(request, dsn, url_name)
76+
return proxy_error_embed_request(request, dsn, url_name)
7977

8078
if (
8179
request.resolver_match
@@ -90,7 +88,7 @@ async def proxy_request_if_needed(
9088
},
9189
)
9290

93-
return await proxy_cell_request(request, cell, url_name)
91+
return proxy_cell_request(request, cell, url_name)
9492

9593
if url_name != "unknown":
9694
# If we know the URL but didn't proxy it record we could be missing
Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from __future__ import annotations
22

3-
import asyncio
43
from collections.abc import Callable
54
from typing import Any
65

7-
from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction
86
from django.http.response import HttpResponseBase
97
from rest_framework.request import Request
108

@@ -14,65 +12,21 @@
1412
class ApiGatewayMiddleware:
1513
"""Proxy requests intended for remote silos"""
1614

17-
async_capable = True
18-
sync_capable = True
19-
2015
def __init__(self, get_response: Callable[[Request], HttpResponseBase]):
2116
self.get_response = get_response
22-
if iscoroutinefunction(self.get_response):
23-
markcoroutinefunction(self)
2417

25-
def __call__(self, request: Request) -> Any:
26-
if iscoroutinefunction(self):
27-
return self.__acall__(request)
18+
def __call__(self, request: Request) -> HttpResponseBase:
2819
return self.get_response(request)
2920

30-
async def __acall__(self, request: Request) -> HttpResponseBase:
31-
return await self.get_response(request) # type: ignore[misc]
32-
3321
def process_view(
3422
self,
3523
request: Request,
3624
view_func: Callable[..., HttpResponseBase],
3725
view_args: tuple[str],
3826
view_kwargs: dict[str, Any],
3927
) -> HttpResponseBase | None:
40-
return self._process_view_match(request, view_func, view_args, view_kwargs)
41-
42-
def _process_view_match(
43-
self,
44-
request: Request,
45-
view_func: Callable[..., HttpResponseBase],
46-
view_args: tuple[str],
47-
view_kwargs: dict[str, Any],
48-
) -> Any:
49-
#: we check if we're in an async or sync runtime once, then
50-
# overwrite the method with the actual impl.
51-
try:
52-
asyncio.get_running_loop()
53-
method = self._process_view_inner
54-
except RuntimeError:
55-
method = self._process_view_sync # type: ignore[assignment]
56-
setattr(self, "_process_view_match", method)
57-
return method(request, view_func, view_args, view_kwargs)
58-
59-
def _process_view_sync(
60-
self,
61-
request: Request,
62-
view_func: Callable[..., HttpResponseBase],
63-
view_args: tuple[str],
64-
view_kwargs: dict[str, Any],
65-
) -> HttpResponseBase | None:
66-
return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs)
67-
68-
async def _process_view_inner(
69-
self,
70-
request: Request,
71-
view_func: Callable[..., HttpResponseBase],
72-
view_args: tuple[str],
73-
view_kwargs: dict[str, Any],
74-
) -> HttpResponseBase | None:
75-
proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs)
28+
proxy_response = proxy_request_if_needed(request, view_func, view_kwargs)
7629
if proxy_response is not None:
7730
return proxy_response
78-
return None
31+
else:
32+
return None

src/sentry/hybridcloud/apigateway/proxy.py

Lines changed: 85 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,21 @@
44

55
from __future__ import annotations
66

7-
import asyncio
87
import logging
9-
from collections.abc import AsyncGenerator, AsyncIterator
8+
from collections.abc import Generator
109
from urllib.parse import urljoin, urlparse
1110
from wsgiref.util import is_hop_by_hop
1211

13-
import httpx
14-
from asgiref.sync import sync_to_async
1512
from django.conf import settings
16-
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
13+
from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse
1714
from django.http.response import HttpResponseBase
15+
from requests import Response as ExternalResponse
16+
from requests import request as external_request
17+
from requests.exceptions import Timeout
1818

1919
from sentry import options
2020
from sentry.api.exceptions import RequestTimeout
21-
from sentry.objectstore.endpoints.organization import get_raw_body_async
21+
from sentry.objectstore.endpoints.organization import ChunkedEncodingDecoder, get_raw_body
2222
from sentry.silo.util import (
2323
PROXY_APIGATEWAY_HEADER,
2424
PROXY_DIRECT_LOCATION_HEADER,
@@ -32,12 +32,11 @@
3232
get_cell_for_organization,
3333
)
3434
from sentry.utils import metrics
35-
from sentry.utils.http import BodyAsyncWrapper
35+
from sentry.utils.circuit_breaker2 import CircuitBreaker, CountBasedTripStrategy
36+
from sentry.utils.http import BodyWithLength
3637

3738
logger = logging.getLogger(__name__)
3839

39-
proxy_client = httpx.AsyncClient()
40-
4140
# Endpoints that handle uploaded files have higher timeouts configured
4241
# and we need to honor those timeouts when proxying.
4342
# See frontend/templates/sites-enabled/sentry.io in getsentry/ops
@@ -56,57 +55,41 @@
5655
PROXY_CHUNK_SIZE = 512 * 1024
5756

5857

59-
async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]:
60-
"""Yield chunks from an httpx response and close the connection when done."""
61-
try:
62-
async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE):
63-
yield chunk
64-
finally:
65-
await response.aclose()
66-
67-
68-
def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse:
69-
"""Convert an httpx Response into a Django response."""
58+
def _parse_response(response: ExternalResponse, remote_url: str) -> StreamingHttpResponse:
59+
"""
60+
Convert the Responses class from requests into the drf Response
61+
"""
7062

71-
new_headers = clean_outbound_headers(response.headers)
72-
content_type = new_headers.pop("Content-Type", None)
63+
def stream_response() -> Generator[bytes]:
64+
yield from response.iter_content(PROXY_CHUNK_SIZE)
7365

7466
streamed_response = StreamingHttpResponse(
75-
streaming_content=_stream_response_and_close(response),
67+
streaming_content=stream_response(),
7668
status=response.status_code,
77-
content_type=content_type,
69+
content_type=response.headers.pop("Content-Type", None),
7870
)
79-
80-
for header, value in new_headers.items():
71+
# Add Headers to response
72+
for header, value in response.headers.items():
8173
if not is_hop_by_hop(header):
8274
streamed_response[header] = value
8375

8476
streamed_response[PROXY_DIRECT_LOCATION_HEADER] = remote_url
8577
return streamed_response
8678

8779

88-
async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]:
89-
async for chunk in body:
90-
yield chunk
91-
92-
93-
async def proxy_request(
94-
request: HttpRequest,
95-
org_id_or_slug: str,
96-
url_name: str,
97-
) -> HttpResponseBase:
80+
def proxy_request(request: HttpRequest, org_id_or_slug: str, url_name: str) -> HttpResponseBase:
9881
"""Take a django request object and proxy it to a remote location given an org_id_or_slug"""
9982

10083
try:
101-
cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug)
84+
cell = get_cell_for_organization(org_id_or_slug)
10285
except CellResolutionError as e:
10386
logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)})
10487
return HttpResponse(status=404)
10588

106-
return await proxy_cell_request(request, cell, url_name)
89+
return proxy_cell_request(request, cell, url_name)
10790

10891

109-
async def proxy_error_embed_request(
92+
def proxy_error_embed_request(
11093
request: HttpRequest, dsn: str, url_name: str
11194
) -> HttpResponseBase | None:
11295
try:
@@ -126,65 +109,104 @@ async def proxy_error_embed_request(
126109
# If we don't have a o123.ingest.{cell}.{app_host} style domain
127110
# we forward to the monolith cell
128111
cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION)
129-
return await proxy_cell_request(request, cell, url_name)
112+
return proxy_cell_request(request, cell, url_name)
130113
try:
131114
cell_offset = len(app_segments) + 1
132115
cell_segment = host_segments[cell_offset * -1]
133116
cell = get_cell_by_name(cell_segment)
134117
except Exception:
135118
return None
136119

137-
return await proxy_cell_request(request, cell, url_name)
120+
return proxy_cell_request(request, cell, url_name)
138121

139122

140-
async def proxy_cell_request(
141-
request: HttpRequest,
142-
cell: Cell,
143-
url_name: str,
144-
) -> StreamingHttpResponse:
123+
def proxy_cell_request(request: HttpRequest, cell: Cell, url_name: str) -> HttpResponseBase:
145124
"""Take a django request object and proxy it to a cell silo"""
125+
126+
metric_tags = {"region": cell.name, "url_name": url_name}
127+
circuit_breaker: CircuitBreaker | None = None
128+
# TODO(mark) remove rollout options
129+
if options.get("apigateway.proxy.circuit-breaker.enabled"):
130+
try:
131+
circuit_breaker = CircuitBreaker(
132+
key=f"apigateway.proxy.{cell.name}",
133+
config=options.get("apigateway.proxy.circuit-breaker.config"),
134+
trip_strategy=CountBasedTripStrategy.from_config(
135+
options.get("apigateway.proxy.circuit-breaker.config")
136+
),
137+
)
138+
except Exception as e:
139+
logger.warning("apigateway.invalid-breaker-config", extra={"message": str(e)})
140+
141+
if circuit_breaker is not None:
142+
if not circuit_breaker.should_allow_request():
143+
metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags)
144+
if options.get("apigateway.proxy.circuit-breaker.enforce"):
145+
body = {
146+
"error": "apigateway",
147+
"detail": "Downstream service temporarily unavailable",
148+
}
149+
return JsonResponse(body, status=503)
150+
146151
target_url = urljoin(cell.address, request.path)
147152

148153
content_encoding = request.headers.get("Content-Encoding")
149-
content_length = request.headers.get("Content-Length")
150154
header_dict = clean_proxy_headers(request.headers)
151155
header_dict[PROXY_APIGATEWAY_HEADER] = "true"
152156

157+
# TODO: use requests session for connection pooling capabilities
153158
assert request.method is not None
154159
query_params = request.GET
155160

156-
timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT)
157-
metric_tags = {"region": cell.name, "url_name": url_name}
161+
# This option has a default of None, which is cast to 0
162+
timeout = options.get("apigateway.proxy.timeout")
163+
if not timeout:
164+
timeout = settings.GATEWAY_PROXY_TIMEOUT
165+
timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, timeout)
158166

159167
# XXX: See sentry.testutils.pytest.sentry for more information
160168
if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"):
161169
return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404)
162170

171+
data: bytes | Generator[bytes] | ChunkedEncodingDecoder | BodyWithLength | None = None
163172
if url_name == "sentry-api-0-organization-objectstore":
164173
if content_encoding:
165174
header_dict["Content-Encoding"] = content_encoding
166-
data = get_raw_body_async(request)
175+
data = get_raw_body(request)
167176
else:
168-
data = BodyAsyncWrapper(request.body)
169-
# With request streaming, and without `Content-Length` header,
170-
# `httpx` will set chunked transfer encoding.
171-
# Upstream doesn't necessarily support this,
172-
# thus we re-add the header if it was present in the original request.
173-
if content_length:
174-
header_dict["Content-Length"] = content_length
177+
data = BodyWithLength(request)
175178

176179
try:
177180
with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags):
178-
req = proxy_client.build_request(
181+
resp = external_request(
179182
request.method,
180-
target_url,
183+
url=target_url,
181184
headers=header_dict,
182185
params=dict(query_params) if query_params is not None else None,
183-
content=_stream_request(data) if data else None, # type: ignore[arg-type]
186+
data=data,
187+
stream=True,
184188
timeout=timeout,
189+
# By default, external_request will resolve any redirects for any verb except for HEAD.
190+
# We explicitly disable this behavior to avoid misrepresenting the original sentry.io request with the
191+
# body response of the redirect.
192+
allow_redirects=False,
185193
)
186-
resp = await proxy_client.send(req, stream=True, follow_redirects=False)
187-
return _adapt_response(resp, target_url)
188-
except (httpx.TimeoutException, asyncio.CancelledError):
194+
except Timeout:
195+
metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags)
196+
try:
197+
if circuit_breaker is not None:
198+
circuit_breaker.record_error()
199+
except Exception:
200+
logger.exception("Failed to record circuitbreaker failure")
201+
189202
# remote silo timeout. Use DRF timeout instead
190203
raise RequestTimeout()
204+
205+
if resp.status_code >= 500 and circuit_breaker is not None:
206+
metrics.incr("apigateway.proxy.request_failed", tags=metric_tags)
207+
circuit_breaker.record_error()
208+
209+
new_headers = clean_outbound_headers(resp.headers)
210+
resp.headers.clear()
211+
resp.headers.update(new_headers)
212+
return _parse_response(resp, target_url)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .apigateway import proxy_request_if_needed
2+
3+
__all__ = ("proxy_request_if_needed",)

0 commit comments

Comments
 (0)