Skip to content

Commit 1ff7ea3

Browse files
committed
feat(apigateway): implement circuitbreakers in async variant
1 parent 98f96d9 commit 1ff7ea3

File tree

3 files changed

+163
-30
lines changed

3 files changed

+163
-30
lines changed

src/sentry/conf/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3183,6 +3183,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
31833183
}
31843184
# Used in tests to skip forwarding relay paths to a region silo that does not exist.
31853185
APIGATEWAY_PROXY_SKIP_RELAY = False
3186+
APIGATEWAY_PROXY_MAX_CONCURRENCY = 100
3187+
APIGATEWAY_PROXY_MAX_FAILURES = 100
3188+
APIGATEWAY_PROXY_FAILURE_WINDOW = 60
31863189

31873190
# Shared resource ids for accounting
31883191
EVENT_PROCESSING_STORE = "rc_processing_redis"
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import time
5+
from collections import defaultdict
6+
7+
from django.conf import settings
8+
9+
10+
class CircuitBreaker:
11+
__slots__ = [
12+
"concurrency",
13+
"counter_window",
14+
"failures",
15+
"semaphore",
16+
"_clock",
17+
"_counters",
18+
"_counter_idx",
19+
]
20+
21+
def __init__(self, concurrency, failures):
22+
self.concurrency = concurrency
23+
self.counter_window = failures[0]
24+
self.failures = failures[1]
25+
self.semaphore = asyncio.Semaphore(self.concurrency)
26+
self._clock = 0
27+
self._counters = [0, 0]
28+
self._counter_idx = 0
29+
30+
def _counter_flip(self, clock):
31+
self._clock = clock
32+
prev = self._counter_idx
33+
self._counter_idx = 1 - prev
34+
self._counters[prev] = 0
35+
36+
def _maybe_counter_flip(self):
37+
now = int(time.monotonic())
38+
delta = now - self._clock
39+
if delta > 0:
40+
if delta // self.counter_window:
41+
self._counter_flip(now)
42+
43+
def counter_incr(self):
44+
self._maybe_counter_flip()
45+
self._counters[self._counter_idx] += 1
46+
47+
def window_overflow(self) -> bool:
48+
return self._counters[self._counter_idx] > self.failures
49+
50+
def overflow(self) -> bool:
51+
return self.semaphore.locked()
52+
53+
def ctx(self) -> CircuitBreakerCtx:
54+
return CircuitBreakerCtx(self)
55+
56+
57+
class CircuitBreakerOverflow(Exception): ...
58+
59+
60+
class CircuitBreakerWindowOverflow(Exception): ...
61+
62+
63+
class CircuitBreakerCtx:
64+
__slots__ = ["cb"]
65+
66+
def __init__(self, cb: CircuitBreaker):
67+
self.cb = cb
68+
69+
def incr_failures(self):
70+
self.cb.counter_incr()
71+
72+
async def __aenter__(self):
73+
if self.cb.overflow():
74+
raise CircuitBreakerOverflow
75+
await self.cb.semaphore.acquire()
76+
if self.cb.window_overflow():
77+
self.cb.semaphore.release()
78+
raise CircuitBreakerWindowOverflow
79+
return self
80+
81+
async def __aexit__(self, exc_type, exc_value, exc_tb):
82+
self.cb.semaphore.release()
83+
84+
85+
class CircuitBreakerManager:
86+
__slots__ = ["objs"]
87+
88+
def __init__(
89+
self,
90+
max_concurrency: int | None = None,
91+
failures: int | None = None,
92+
failure_window: int | None = None,
93+
):
94+
concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY
95+
failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES
96+
failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW
97+
self.objs = defaultdict(lambda: CircuitBreaker(concurrency, (failure_window, failures)))
98+
99+
def get(self, key: str) -> CircuitBreakerCtx:
100+
return self.objs[key].ctx()

src/sentry/hybridcloud/apigateway_async/proxy.py

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import httpx
1414
from asgiref.sync import sync_to_async
1515
from django.conf import settings
16-
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
16+
from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse
1717
from django.http.response import HttpResponseBase
1818

1919
from sentry import options
@@ -34,9 +34,16 @@
3434
from sentry.utils import metrics
3535
from sentry.utils.http import BodyAsyncWrapper
3636

37+
from .circuitbreaker import (
38+
CircuitBreakerManager,
39+
CircuitBreakerOverflow,
40+
CircuitBreakerWindowOverflow,
41+
)
42+
3743
logger = logging.getLogger(__name__)
3844

3945
proxy_client = httpx.AsyncClient()
46+
circuitbreakers = CircuitBreakerManager()
4047

4148
# Endpoints that handle uploaded files have higher timeouts configured
4249
# and we need to honor those timeouts when proxying.
@@ -141,8 +148,9 @@ async def proxy_cell_request(
141148
request: HttpRequest,
142149
cell: Cell,
143150
url_name: str,
144-
) -> StreamingHttpResponse:
151+
) -> HttpResponseBase:
145152
"""Take a django request object and proxy it to a cell silo"""
153+
metric_tags = {"region": cell.name, "url_name": url_name}
146154
target_url = urljoin(cell.address, request.path)
147155

148156
content_encoding = request.headers.get("Content-Encoding")
@@ -154,37 +162,59 @@ async def proxy_cell_request(
154162
query_params = request.GET
155163

156164
timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT)
157-
metric_tags = {"region": cell.name, "url_name": url_name}
158165

159166
# XXX: See sentry.testutils.pytest.sentry for more information
160167
if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"):
161168
return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404)
162169

163-
if url_name == "sentry-api-0-organization-objectstore":
164-
if content_encoding:
165-
header_dict["Content-Encoding"] = content_encoding
166-
data = get_raw_body_async(request)
167-
else:
168-
data = BodyAsyncWrapper(request.body)
169-
# With request streaming, and without `Content-Length` header,
170-
# `httpx` will set chunked transfer encoding.
171-
# Upstream doesn't necessarily support this,
172-
# thus we re-add the header if it was present in the original request.
173-
if content_length:
174-
header_dict["Content-Length"] = content_length
175-
176170
try:
177-
with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags):
178-
req = proxy_client.build_request(
179-
request.method,
180-
target_url,
181-
headers=header_dict,
182-
params=dict(query_params) if query_params is not None else None,
183-
content=_stream_request(data) if data else None, # type: ignore[arg-type]
184-
timeout=timeout,
185-
)
186-
resp = await proxy_client.send(req, stream=True, follow_redirects=False)
187-
return _adapt_response(resp, target_url)
188-
except (httpx.TimeoutException, asyncio.CancelledError):
189-
# remote silo timeout. Use DRF timeout instead
190-
raise RequestTimeout()
171+
async with circuitbreakers.get(cell.name) as circuitbreaker:
172+
if url_name == "sentry-api-0-organization-objectstore":
173+
if content_encoding:
174+
header_dict["Content-Encoding"] = content_encoding
175+
data = get_raw_body_async(request)
176+
else:
177+
data = BodyAsyncWrapper(request.body)
178+
# With request streaming, and without `Content-Length` header,
179+
# `httpx` will set chunked transfer encoding.
180+
# Upstream doesn't necessarily support this,
181+
# thus we re-add the header if it was present in the original request.
182+
if content_length:
183+
header_dict["Content-Length"] = content_length
184+
185+
try:
186+
with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags):
187+
req = proxy_client.build_request(
188+
request.method,
189+
target_url,
190+
headers=header_dict,
191+
params=dict(query_params) if query_params is not None else None,
192+
content=_stream_request(data) if data else None, # type: ignore[arg-type]
193+
timeout=timeout,
194+
)
195+
resp = await proxy_client.send(req, stream=True, follow_redirects=False)
196+
if resp.status_code >= 502:
197+
metrics.incr("apigateway.proxy.request_failed", tags=metric_tags)
198+
circuitbreaker.incr_failures()
199+
return _adapt_response(resp, target_url)
200+
except (httpx.TimeoutException, asyncio.CancelledError):
201+
metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags)
202+
circuitbreaker.incr_failures()
203+
# remote silo timeout. Use DRF timeout instead
204+
raise RequestTimeout()
205+
except httpx.RequestError:
206+
metrics.incr("apigateway.proxy.request_failed", tags=metric_tags)
207+
circuitbreaker.incr_failures()
208+
raise
209+
except CircuitBreakerOverflow:
210+
metrics.incr("apigateway.proxy.circuit_breaker.overflow", tags=metric_tags)
211+
return JsonResponse(
212+
{"error": "apigateway", "detail": "Too many requests"},
213+
status=429,
214+
)
215+
except CircuitBreakerWindowOverflow:
216+
metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags)
217+
return JsonResponse(
218+
{"error": "apigateway", "detail": "Downstream service temporarily unavailable"},
219+
status=503,
220+
)

0 commit comments

Comments
 (0)