Skip to content

Commit b71c092

Browse files
authored
feat(hybridcloud): async apigateway (#111307)
This changes the `apigateway` proxy to be async, with the idea to serve the relevant deployment of control silo in ASGI rather than WSGI. The rationale here is to avoid situations in which we exhaust the server's threadpool by just waiting for apigateway requests to complete, as we saw in INCs 2054/2056. **Note:** the APIGateway changes are gated into a separated Python module, the async flow is enabled through `SENTRY_ASYNC_APIGW` environment variable. This allows us to control the rollout of the change in prod. Tests and local devserver are instead always using the new code. Detailed changes: - [x] Make APIGateway proxy `async`, switching inner client impl from `requests` to `httpx` - [x] Change APIGateway middleware to work both in ASGI and WSGI contexts (with the latter using `async_to_sync`) - [x] Update relevant tests interacting with APIGateway - [x] Fix proxy acceptance test - <s> Fix ORM calls in the custom SDK integration</s> - [x] Bypass ORM calls in SDK custom logging integration - [x] Restore/adapt circuit brakers
1 parent f93bbec commit b71c092

File tree

28 files changed

+1023
-391
lines changed

28 files changed

+1023
-391
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
/src/sentry/projectoptions/ @getsentry/app-backend
3636
/src/sentry/receivers/ @getsentry/app-backend
3737
/src/sentry/ratelimits/ @getsentry/app-backend
38+
/src/sentry/asgi.py @getsentry/app-backend
3839

3940
/tests/js/sentry-test/ @getsentry/app-frontend
4041
/static/app/utils/ @getsentry/app-frontend

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,15 @@ dependencies = [
3737
"google-cloud-storage-transfer>=1.17.0",
3838
"google-crc32c>=1.6.0",
3939
"googleapis-common-protos>=1.63.2",
40-
"granian[pname,reload]>=2.7",
40+
"granian[pname,reload,uvloop]>=2.7",
4141
"grpc-google-iam-v1>=0.13.1",
4242
# Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1
4343
# See https://github.com/grpc/grpc/issues/23796 and
4444
# https://github.com/grpc/grpc/blob/v1.35.x/doc/core/grpc-polling-engines.md#polling-engine-implementations-in-grpc
4545
"grpcio>=1.67.0",
4646
# not directly used, but provides a speedup for redis
4747
"hiredis>=2.3.2",
48+
"httpx>=0.28.1",
4849
"jsonschema>=4.20.0",
4950
"lxml>=5.3.0",
5051
"maxminddb>=2.3.0",

src/sentry/asgi.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import os.path
2+
import sys
3+
4+
# Add the project to the python path
5+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir))
6+
7+
# Configure the application only if it seemingly isn't already configured
8+
from django.conf import settings
9+
10+
if not settings.configured:
11+
from sentry.runner import configure
12+
13+
configure()
14+
15+
from django.core.handlers.asgi import ASGIHandler
16+
17+
# Run ASGI handler for the application
18+
application = ASGIHandler()

src/sentry/conf/server.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,12 @@ def env(
371371
# so that responses aren't modified after Content-Length is set, or have the
372372
# response modifying middleware reset the Content-Length header.
373373
# This is because CommonMiddleware Sets the Content-Length header for non-streaming responses.
374+
APIGW_ASYNC = os.environ.get("SENTRY_APIGW_ASYNC", "").lower() in ("1", "true", "y", "yes")
375+
APIGW_MIDDLEWARE = (
376+
"sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware"
377+
if APIGW_ASYNC
378+
else "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware"
379+
)
374380
MIDDLEWARE: tuple[str, ...] = (
375381
"csp.middleware.CSPMiddleware",
376382
"sentry.middleware.health.HealthCheck",
@@ -387,7 +393,7 @@ def env(
387393
"sentry.middleware.auth.AuthenticationMiddleware",
388394
"sentry.middleware.ai_agent.AIAgentMiddleware",
389395
"sentry.middleware.integrations.IntegrationControlMiddleware",
390-
"sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware",
396+
APIGW_MIDDLEWARE,
391397
"sentry.middleware.demo_mode_guard.DemoModeGuardMiddleware",
392398
"sentry.middleware.customer_domain.CustomerDomainMiddleware",
393399
"sentry.middleware.sudo.SudoMiddleware",
@@ -3181,6 +3187,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
31813187
}
31823188
# Used in tests to skip forwarding relay paths to a region silo that does not exist.
31833189
APIGATEWAY_PROXY_SKIP_RELAY = False
3190+
APIGATEWAY_PROXY_MAX_CONCURRENCY = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_CONCURRENCY", 100))
3191+
APIGATEWAY_PROXY_MAX_FAILURES = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_FAILURES", 100))
3192+
APIGATEWAY_PROXY_FAILURE_WINDOW = int(os.environ.get("SENTRY_APIGW_PROXY_FAILURE_WINDOW", 60))
31843193

31853194
# Shared resource ids for accounting
31863195
EVENT_PROCESSING_STORE = "rc_processing_redis"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .apigateway import proxy_request_if_needed
2+
3+
__all__ = ("proxy_request_if_needed",)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Callable
5+
from typing import Any
6+
7+
from django.conf import settings
8+
from django.http.response import HttpResponseBase
9+
from rest_framework.request import Request
10+
11+
from sentry.silo.base import SiloLimit, SiloMode
12+
from sentry.types.cell import get_cell_by_name
13+
from sentry.utils import metrics
14+
15+
from .proxy import (
16+
proxy_cell_request,
17+
proxy_error_embed_request,
18+
proxy_request,
19+
)
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset[SiloMode] | None:
25+
view_class = getattr(view_func, "view_class", None)
26+
if not view_class:
27+
return None
28+
if not hasattr(view_class, "silo_limit"):
29+
return None
30+
endpoint_silo_limit: SiloLimit = view_class.silo_limit
31+
return endpoint_silo_limit.modes
32+
33+
34+
async def proxy_request_if_needed(
35+
request: Request,
36+
view_func: Callable[..., HttpResponseBase],
37+
view_kwargs: dict[str, Any],
38+
) -> HttpResponseBase | None:
39+
"""
40+
Main execution flow for the API Gateway.
41+
returns None if proxying is not required, or a response if the proxy was successful.
42+
"""
43+
current_silo_mode = SiloMode.get_current_mode()
44+
if current_silo_mode != SiloMode.CONTROL:
45+
return None
46+
47+
silo_modes = _get_view_silo_mode(view_func)
48+
if not silo_modes or current_silo_mode in silo_modes:
49+
return None
50+
51+
url_name = "unknown"
52+
if request.resolver_match:
53+
url_name = request.resolver_match.url_name or url_name
54+
55+
if "organization_slug" in view_kwargs or "organization_id_or_slug" in view_kwargs:
56+
org_id_or_slug = str(
57+
view_kwargs.get("organization_slug") or view_kwargs.get("organization_id_or_slug", "")
58+
)
59+
60+
metrics.incr(
61+
"apigateway.proxy_request",
62+
tags={
63+
"url_name": url_name,
64+
"kind": "orgslug",
65+
},
66+
)
67+
return await proxy_request(request, org_id_or_slug, url_name)
68+
69+
if url_name == "sentry-error-page-embed" and "dsn" in request.GET:
70+
# Error embed modal is special as customers can't easily use cell URLs.
71+
dsn = request.GET["dsn"]
72+
metrics.incr(
73+
"apigateway.proxy_request",
74+
tags={
75+
"url_name": url_name,
76+
"kind": "error-embed",
77+
},
78+
)
79+
return await proxy_error_embed_request(request, dsn, url_name)
80+
81+
if (
82+
request.resolver_match
83+
and request.resolver_match.url_name in settings.REGION_PINNED_URL_NAMES
84+
):
85+
cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION)
86+
metrics.incr(
87+
"apigateway.proxy_request",
88+
tags={
89+
"url_name": url_name,
90+
"kind": "regionpin",
91+
},
92+
)
93+
94+
return await proxy_cell_request(request, cell, url_name)
95+
96+
if url_name != "unknown":
97+
# If we know the URL but didn't proxy it record we could be missing
98+
# URL handling and that needs to be fixed.
99+
metrics.incr(
100+
"apigateway.proxy_request",
101+
tags={
102+
"kind": "noop",
103+
"url_name": url_name,
104+
},
105+
)
106+
logger.info("apigateway.unknown_url", extra={"url": request.path})
107+
108+
return None
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import time
5+
from collections import defaultdict
6+
from typing import Any
7+
8+
from django.conf import settings
9+
10+
11+
class CircuitBreaker:
12+
__slots__ = [
13+
"concurrency",
14+
"counter_window",
15+
"failures",
16+
"semaphore",
17+
"_clock",
18+
"_counters",
19+
"_counter_idx",
20+
]
21+
22+
def __init__(self, concurrency: int, failures: tuple[int, int]) -> None:
23+
self.concurrency = concurrency
24+
self.counter_window = failures[0]
25+
self.failures = failures[1]
26+
self.semaphore = asyncio.Semaphore(self.concurrency)
27+
self._clock = 0
28+
self._counters = [0, 0]
29+
self._counter_idx = 0
30+
31+
def _counter_flip(self, clock: int) -> None:
32+
self._clock = clock
33+
prev = self._counter_idx
34+
self._counter_idx = 1 - prev
35+
self._counters[prev] = 0
36+
37+
def _maybe_counter_flip(self) -> None:
38+
now = int(time.monotonic())
39+
delta = now - self._clock
40+
if delta > 0:
41+
if delta // self.counter_window:
42+
self._counter_flip(now)
43+
44+
def counter_incr(self) -> None:
45+
self._maybe_counter_flip()
46+
self._counters[self._counter_idx] += 1
47+
48+
def window_overflow(self) -> bool:
49+
self._maybe_counter_flip()
50+
return self._counters[self._counter_idx] > self.failures
51+
52+
def overflow(self) -> bool:
53+
return self.semaphore.locked()
54+
55+
def ctx(self) -> CircuitBreakerCtx:
56+
return CircuitBreakerCtx(self)
57+
58+
59+
class CircuitBreakerOverflow(Exception): ...
60+
61+
62+
class CircuitBreakerWindowOverflow(Exception): ...
63+
64+
65+
class CircuitBreakerCtx:
66+
__slots__ = ["cb"]
67+
68+
def __init__(self, cb: CircuitBreaker):
69+
self.cb = cb
70+
71+
def incr_failures(self) -> None:
72+
self.cb.counter_incr()
73+
74+
async def __aenter__(self) -> CircuitBreakerCtx:
75+
if self.cb.overflow():
76+
raise CircuitBreakerOverflow
77+
await self.cb.semaphore.acquire()
78+
if self.cb.window_overflow():
79+
self.cb.semaphore.release()
80+
raise CircuitBreakerWindowOverflow
81+
return self
82+
83+
async def __aexit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None:
84+
self.cb.semaphore.release()
85+
86+
87+
class CircuitBreakerManager:
88+
__slots__ = ["objs"]
89+
90+
def __init__(
91+
self,
92+
max_concurrency: int | None = None,
93+
failures: int | None = None,
94+
failure_window: int | None = None,
95+
):
96+
concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY
97+
failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES
98+
failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW
99+
self.objs: dict[str, CircuitBreaker] = defaultdict(
100+
lambda: CircuitBreaker(concurrency, (failure_window, failures))
101+
)
102+
103+
def get(self, key: str) -> CircuitBreakerCtx:
104+
return self.objs[key].ctx()
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from collections.abc import Callable
5+
from typing import Any
6+
7+
from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction
8+
from django.http.response import HttpResponseBase
9+
from rest_framework.request import Request
10+
11+
from . import proxy_request_if_needed
12+
13+
14+
class ApiGatewayMiddleware:
15+
"""Proxy requests intended for remote silos"""
16+
17+
async_capable = True
18+
sync_capable = True
19+
20+
def __init__(self, get_response: Callable[[Request], HttpResponseBase]):
21+
self.get_response = get_response
22+
if iscoroutinefunction(self.get_response):
23+
markcoroutinefunction(self)
24+
25+
def __call__(self, request: Request) -> Any:
26+
if iscoroutinefunction(self):
27+
return self.__acall__(request)
28+
return self.get_response(request)
29+
30+
async def __acall__(self, request: Request) -> HttpResponseBase:
31+
return await self.get_response(request) # type: ignore[misc]
32+
33+
def process_view(
34+
self,
35+
request: Request,
36+
view_func: Callable[..., HttpResponseBase],
37+
view_args: tuple[str],
38+
view_kwargs: dict[str, Any],
39+
) -> HttpResponseBase | None:
40+
return self._process_view_match(request, view_func, view_args, view_kwargs)
41+
42+
def _process_view_match(
43+
self,
44+
request: Request,
45+
view_func: Callable[..., HttpResponseBase],
46+
view_args: tuple[str],
47+
view_kwargs: dict[str, Any],
48+
) -> Any:
49+
#: we check if we're in an async or sync runtime once, then
50+
# overwrite the method with the actual impl.
51+
try:
52+
asyncio.get_running_loop()
53+
method = self._process_view_inner
54+
except RuntimeError:
55+
method = self._process_view_sync # type: ignore[assignment]
56+
setattr(self, "_process_view_match", method)
57+
return method(request, view_func, view_args, view_kwargs)
58+
59+
def _process_view_sync(
60+
self,
61+
request: Request,
62+
view_func: Callable[..., HttpResponseBase],
63+
view_args: tuple[str],
64+
view_kwargs: dict[str, Any],
65+
) -> HttpResponseBase | None:
66+
return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs)
67+
68+
async def _process_view_inner(
69+
self,
70+
request: Request,
71+
view_func: Callable[..., HttpResponseBase],
72+
view_args: tuple[str],
73+
view_kwargs: dict[str, Any],
74+
) -> HttpResponseBase | None:
75+
proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs)
76+
if proxy_response is not None:
77+
return proxy_response
78+
return None

0 commit comments

Comments
 (0)