-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
feat(hybridcloud): async apigateway #111307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6277f28
b6e74e3
e74f1fd
556cdc8
2398fdc
39706a4
6785c1d
602e39a
52d23a4
bb62cbf
8760f63
89fc588
12eb8df
944123b
71f31e3
e1951af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import os.path | ||
| import sys | ||
|
|
||
| # Add the project to the python path | ||
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) | ||
|
|
||
| # Configure the application only if it seemingly isn't already configured | ||
| from django.conf import settings | ||
|
|
||
| if not settings.configured: | ||
| from sentry.runner import configure | ||
|
|
||
| configure() | ||
|
|
||
| from django.core.handlers.asgi import ASGIHandler | ||
|
|
||
| # Run ASGI handler for the application | ||
| application = ASGIHandler() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from .apigateway import proxy_request_if_needed | ||
|
|
||
| __all__ = ("proxy_request_if_needed",) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from collections.abc import Callable | ||
| from typing import Any | ||
|
|
||
| from django.conf import settings | ||
| from django.http.response import HttpResponseBase | ||
| from rest_framework.request import Request | ||
|
|
||
| from sentry.silo.base import SiloLimit, SiloMode | ||
| from sentry.types.cell import get_cell_by_name | ||
| from sentry.utils import metrics | ||
|
|
||
| from .proxy import ( | ||
| proxy_cell_request, | ||
| proxy_error_embed_request, | ||
| proxy_request, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset[SiloMode] | None: | ||
| view_class = getattr(view_func, "view_class", None) | ||
| if not view_class: | ||
| return None | ||
| if not hasattr(view_class, "silo_limit"): | ||
| return None | ||
| endpoint_silo_limit: SiloLimit = view_class.silo_limit | ||
| return endpoint_silo_limit.modes | ||
|
|
||
|
|
||
| async def proxy_request_if_needed( | ||
| request: Request, | ||
| view_func: Callable[..., HttpResponseBase], | ||
| view_kwargs: dict[str, Any], | ||
| ) -> HttpResponseBase | None: | ||
| """ | ||
| Main execution flow for the API Gateway. | ||
| returns None if proxying is not required, or a response if the proxy was successful. | ||
| """ | ||
| current_silo_mode = SiloMode.get_current_mode() | ||
| if current_silo_mode != SiloMode.CONTROL: | ||
| return None | ||
|
|
||
| silo_modes = _get_view_silo_mode(view_func) | ||
| if not silo_modes or current_silo_mode in silo_modes: | ||
| return None | ||
|
|
||
| url_name = "unknown" | ||
| if request.resolver_match: | ||
| url_name = request.resolver_match.url_name or url_name | ||
|
|
||
| if "organization_slug" in view_kwargs or "organization_id_or_slug" in view_kwargs: | ||
| org_id_or_slug = str( | ||
| view_kwargs.get("organization_slug") or view_kwargs.get("organization_id_or_slug", "") | ||
| ) | ||
|
|
||
| metrics.incr( | ||
| "apigateway.proxy_request", | ||
| tags={ | ||
| "url_name": url_name, | ||
| "kind": "orgslug", | ||
| }, | ||
| ) | ||
| return await proxy_request(request, org_id_or_slug, url_name) | ||
|
|
||
| if url_name == "sentry-error-page-embed" and "dsn" in request.GET: | ||
| # Error embed modal is special as customers can't easily use cell URLs. | ||
| dsn = request.GET["dsn"] | ||
| metrics.incr( | ||
| "apigateway.proxy_request", | ||
| tags={ | ||
| "url_name": url_name, | ||
| "kind": "error-embed", | ||
| }, | ||
| ) | ||
| return await proxy_error_embed_request(request, dsn, url_name) | ||
|
|
||
| if ( | ||
| request.resolver_match | ||
| and request.resolver_match.url_name in settings.REGION_PINNED_URL_NAMES | ||
| ): | ||
| cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) | ||
| metrics.incr( | ||
| "apigateway.proxy_request", | ||
| tags={ | ||
| "url_name": url_name, | ||
| "kind": "regionpin", | ||
| }, | ||
| ) | ||
|
|
||
| return await proxy_cell_request(request, cell, url_name) | ||
|
|
||
| if url_name != "unknown": | ||
| # If we know the URL but didn't proxy it record we could be missing | ||
| # URL handling and that needs to be fixed. | ||
| metrics.incr( | ||
| "apigateway.proxy_request", | ||
| tags={ | ||
| "kind": "noop", | ||
| "url_name": url_name, | ||
| }, | ||
| ) | ||
| logger.info("apigateway.unknown_url", extra={"url": request.path}) | ||
|
|
||
| return None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import time | ||
| from collections import defaultdict | ||
| from typing import Any | ||
|
|
||
| from django.conf import settings | ||
|
|
||
|
|
||
| class CircuitBreaker: | ||
| __slots__ = [ | ||
| "concurrency", | ||
| "counter_window", | ||
| "failures", | ||
| "semaphore", | ||
| "_clock", | ||
| "_counters", | ||
| "_counter_idx", | ||
| ] | ||
|
|
||
| def __init__(self, concurrency: int, failures: tuple[int, int]) -> None: | ||
| self.concurrency = concurrency | ||
| self.counter_window = failures[0] | ||
| self.failures = failures[1] | ||
| self.semaphore = asyncio.Semaphore(self.concurrency) | ||
| self._clock = 0 | ||
| self._counters = [0, 0] | ||
| self._counter_idx = 0 | ||
|
gi0baro marked this conversation as resolved.
|
||
|
|
||
| def _counter_flip(self, clock: int) -> None: | ||
| self._clock = clock | ||
| prev = self._counter_idx | ||
| self._counter_idx = 1 - prev | ||
| self._counters[prev] = 0 | ||
|
|
||
| def _maybe_counter_flip(self) -> None: | ||
| now = int(time.monotonic()) | ||
| delta = now - self._clock | ||
| if delta > 0: | ||
| if delta // self.counter_window: | ||
| self._counter_flip(now) | ||
|
|
||
| def counter_incr(self) -> None: | ||
| self._maybe_counter_flip() | ||
| self._counters[self._counter_idx] += 1 | ||
|
|
||
| def window_overflow(self) -> bool: | ||
| self._maybe_counter_flip() | ||
| return self._counters[self._counter_idx] > self.failures | ||
|
cursor[bot] marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Circuit breaker off-by-one in failure threshold checkLow Severity The |
||
|
|
||
| def overflow(self) -> bool: | ||
| return self.semaphore.locked() | ||
|
|
||
| def ctx(self) -> CircuitBreakerCtx: | ||
| return CircuitBreakerCtx(self) | ||
|
|
||
|
|
||
| class CircuitBreakerOverflow(Exception): ... | ||
|
|
||
|
|
||
| class CircuitBreakerWindowOverflow(Exception): ... | ||
|
|
||
|
|
||
| class CircuitBreakerCtx: | ||
| __slots__ = ["cb"] | ||
|
|
||
| def __init__(self, cb: CircuitBreaker): | ||
| self.cb = cb | ||
|
|
||
| def incr_failures(self) -> None: | ||
| self.cb.counter_incr() | ||
|
|
||
| async def __aenter__(self) -> CircuitBreakerCtx: | ||
| if self.cb.overflow(): | ||
| raise CircuitBreakerOverflow | ||
| await self.cb.semaphore.acquire() | ||
| if self.cb.window_overflow(): | ||
| self.cb.semaphore.release() | ||
| raise CircuitBreakerWindowOverflow | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None: | ||
| self.cb.semaphore.release() | ||
|
|
||
|
|
||
| class CircuitBreakerManager: | ||
| __slots__ = ["objs"] | ||
|
|
||
| def __init__( | ||
| self, | ||
| max_concurrency: int | None = None, | ||
| failures: int | None = None, | ||
| failure_window: int | None = None, | ||
| ): | ||
| concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY | ||
| failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES | ||
| failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW | ||
| self.objs: dict[str, CircuitBreaker] = defaultdict( | ||
| lambda: CircuitBreaker(concurrency, (failure_window, failures)) | ||
| ) | ||
|
|
||
| def get(self, key: str) -> CircuitBreakerCtx: | ||
| return self.objs[key].ctx() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| from collections.abc import Callable | ||
| from typing import Any | ||
|
|
||
| from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction | ||
| from django.http.response import HttpResponseBase | ||
| from rest_framework.request import Request | ||
|
|
||
| from . import proxy_request_if_needed | ||
|
|
||
|
|
||
| class ApiGatewayMiddleware: | ||
| """Proxy requests intended for remote silos""" | ||
|
|
||
| async_capable = True | ||
| sync_capable = True | ||
|
|
||
| def __init__(self, get_response: Callable[[Request], HttpResponseBase]): | ||
| self.get_response = get_response | ||
| if iscoroutinefunction(self.get_response): | ||
| markcoroutinefunction(self) | ||
|
|
||
| def __call__(self, request: Request) -> Any: | ||
| if iscoroutinefunction(self): | ||
| return self.__acall__(request) | ||
| return self.get_response(request) | ||
|
|
||
| async def __acall__(self, request: Request) -> HttpResponseBase: | ||
| return await self.get_response(request) # type: ignore[misc] | ||
|
|
||
| def process_view( | ||
| self, | ||
| request: Request, | ||
| view_func: Callable[..., HttpResponseBase], | ||
| view_args: tuple[str], | ||
| view_kwargs: dict[str, Any], | ||
| ) -> HttpResponseBase | None: | ||
| return self._process_view_match(request, view_func, view_args, view_kwargs) | ||
|
|
||
| def _process_view_match( | ||
| self, | ||
| request: Request, | ||
| view_func: Callable[..., HttpResponseBase], | ||
| view_args: tuple[str], | ||
| view_kwargs: dict[str, Any], | ||
| ) -> Any: | ||
| #: we check if we're in an async or sync runtime once, then | ||
| # overwrite the method with the actual impl. | ||
| try: | ||
| asyncio.get_running_loop() | ||
| method = self._process_view_inner | ||
| except RuntimeError: | ||
| method = self._process_view_sync # type: ignore[assignment] | ||
| setattr(self, "_process_view_match", method) | ||
| return method(request, view_func, view_args, view_kwargs) | ||
|
gi0baro marked this conversation as resolved.
|
||
|
|
||
| def _process_view_sync( | ||
| self, | ||
| request: Request, | ||
| view_func: Callable[..., HttpResponseBase], | ||
| view_args: tuple[str], | ||
| view_kwargs: dict[str, Any], | ||
| ) -> HttpResponseBase | None: | ||
| return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) | ||
|
|
||
| async def _process_view_inner( | ||
| self, | ||
| request: Request, | ||
| view_func: Callable[..., HttpResponseBase], | ||
| view_args: tuple[str], | ||
| view_kwargs: dict[str, Any], | ||
| ) -> HttpResponseBase | None: | ||
| proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) | ||
| if proxy_response is not None: | ||
| return proxy_response | ||
| return None | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're running multiple replicas there doesn't seem to be a way to circuit break across the deployment, each replica will have to figure out that there is a problem independently. I'm guessing you went with this approach because our existing circuit breakers use sync-redis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly for that reason, yes.
But also, I don't see that much of a value in having the circuit breaking data shared across the whole deployment (but also the next deployment when we release stuff). Having each worker (not the whole pod) with its own state IMO allows us to configure more granular limits, and avoid conditions in which we overflow the concurrency circuit because of scaling and the amount of running control pods. There might also be conditions in which a single pod is failing to connect to the upstream for $REASONS (eg: machine specific temporary network issues) and I don't think we want that single pod to overload the circuit for everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's all fair. We'll be able to use metrics to see how many gateways instances have breakers open as well, and observe how coherent that state is too.