From 6277f28b038e1b15479b3034e0ae5b09102a902b Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Mon, 23 Mar 2026 12:36:59 +0100 Subject: [PATCH 01/16] feat(hybridcloud): make apigateway async --- pyproject.toml | 1 + src/sentry/asgi.py | 18 ++ .../hybridcloud/apigateway/apigateway.py | 12 +- .../hybridcloud/apigateway/middleware.py | 54 +++- src/sentry/hybridcloud/apigateway/proxy.py | 145 +++++------ .../objectstore/endpoints/organization.py | 39 ++- src/sentry/runner/commands/devserver.py | 1 + src/sentry/services/http.py | 7 +- src/sentry/testutils/helpers/apigateway.py | 150 +++++++++-- src/sentry/testutils/helpers/response.py | 12 +- src/sentry/utils/http.py | 42 +++- .../hybridcloud/apigateway/test_apigateway.py | 92 +++---- .../apigateway/test_apigateway_helpers.py | 21 +- .../hybridcloud/apigateway/test_proxy.py | 236 ++---------------- uv.lock | 7 +- 15 files changed, 428 insertions(+), 409 deletions(-) create mode 100644 src/sentry/asgi.py diff --git a/pyproject.toml b/pyproject.toml index cf052bb24e5359..085dcd0fee9c88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "grpcio>=1.67.0", # not directly used, but provides a speedup for redis "hiredis>=2.3.2", + "httpx>=0.28.1", "jsonschema>=4.20.0", "lxml>=5.3.0", "maxminddb>=2.3.0", diff --git a/src/sentry/asgi.py b/src/sentry/asgi.py new file mode 100644 index 00000000000000..e27dba83252cb0 --- /dev/null +++ b/src/sentry/asgi.py @@ -0,0 +1,18 @@ +import os.path +import sys + +# Add the project to the python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir)) + +# Configure the application only if it seemingly isn't already configured +from django.conf import settings + +if not settings.configured: + from sentry.runner import configure + + configure() + +from django.core.handlers.asgi import ASGIHandler + +# Run ASGI handler for the application +application = ASGIHandler() diff --git a/src/sentry/hybridcloud/apigateway/apigateway.py b/src/sentry/hybridcloud/apigateway/apigateway.py index 3f77e204fd180e..9dd2172ddd59e6 100644 --- a/src/sentry/hybridcloud/apigateway/apigateway.py +++ b/src/sentry/hybridcloud/apigateway/apigateway.py @@ -30,8 +30,10 @@ def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset return endpoint_silo_limit.modes -def proxy_request_if_needed( - request: Request, view_func: Callable[..., HttpResponseBase], view_kwargs: dict[str, Any] +async def proxy_request_if_needed( + request: Request, + view_func: Callable[..., HttpResponseBase], + view_kwargs: dict[str, Any], ) -> HttpResponseBase | None: """ Main execution flow for the API Gateway. @@ -61,7 +63,7 @@ def proxy_request_if_needed( "kind": "orgslug", }, ) - return proxy_request(request, org_id_or_slug, url_name) + return await proxy_request(request, org_id_or_slug, url_name) if url_name == "sentry-error-page-embed" and "dsn" in request.GET: # Error embed modal is special as customers can't easily use cell URLs. @@ -73,7 +75,7 @@ def proxy_request_if_needed( "kind": "error-embed", }, ) - return proxy_error_embed_request(request, dsn, url_name) + return await proxy_error_embed_request(request, dsn, url_name) if ( request.resolver_match @@ -88,7 +90,7 @@ def proxy_request_if_needed( }, ) - return proxy_cell_request(request, cell, url_name) + return await proxy_cell_request(request, cell, url_name) if url_name != "unknown": # If we know the URL but didn't proxy it record we could be missing diff --git a/src/sentry/hybridcloud/apigateway/middleware.py b/src/sentry/hybridcloud/apigateway/middleware.py index 4c67467aac8302..7fbcb13f7c19f9 100644 --- a/src/sentry/hybridcloud/apigateway/middleware.py +++ b/src/sentry/hybridcloud/apigateway/middleware.py @@ -1,8 +1,10 @@ from __future__ import annotations +import asyncio from collections.abc import Callable from typing import Any +from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction from django.http.response import HttpResponseBase from rest_framework.request import Request @@ -12,12 +14,22 @@ class ApiGatewayMiddleware: """Proxy requests intended for remote silos""" + async_capable = True + sync_capable = True + def __init__(self, get_response: Callable[[Request], HttpResponseBase]): self.get_response = get_response + if iscoroutinefunction(self.get_response): + markcoroutinefunction(self) - def __call__(self, request: Request) -> HttpResponseBase: + def __call__(self, request: Request) -> Any: + if iscoroutinefunction(self): + return self.__acall__(request) return self.get_response(request) + async def __acall__(self, request: Request) -> HttpResponseBase: + return await self.get_response(request) # type: ignore[misc] + def process_view( self, request: Request, @@ -25,8 +37,42 @@ def process_view( view_args: tuple[str], view_kwargs: dict[str, Any], ) -> HttpResponseBase | None: - proxy_response = proxy_request_if_needed(request, view_func, view_kwargs) + return self._process_view_match(request, view_func, view_args, view_kwargs) + + def _process_view_match( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> Any: + #: we check if we're in an async or sync runtime once, then + # overwrite the method with the actual impl. + try: + asyncio.get_running_loop() + method = self._process_view_inner + except RuntimeError: + method = self._process_view_sync # type: ignore[assignment] + setattr(self, "_process_view_match", method) + return method(request, view_func, view_args, view_kwargs) + + def _process_view_sync( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) + + async def _process_view_inner( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) if proxy_response is not None: return proxy_response - else: - return None + return None diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index 9f13ce9b8a6f2c..08618d224b7eca 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -4,21 +4,21 @@ from __future__ import annotations +import asyncio import logging -from collections.abc import Generator +from collections.abc import AsyncGenerator, AsyncIterator from urllib.parse import urljoin, urlparse from wsgiref.util import is_hop_by_hop +import httpx +from asgiref.sync import sync_to_async from django.conf import settings -from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse +from django.http import HttpRequest, HttpResponse, StreamingHttpResponse from django.http.response import HttpResponseBase -from requests import Response as ExternalResponse -from requests import request as external_request -from requests.exceptions import ConnectionError, Timeout from sentry import options from sentry.api.exceptions import RequestTimeout -from sentry.objectstore.endpoints.organization import ChunkedEncodingDecoder, get_raw_body +from sentry.objectstore.endpoints.organization import get_raw_body_async from sentry.silo.util import ( PROXY_APIGATEWAY_HEADER, PROXY_DIRECT_LOCATION_HEADER, @@ -32,11 +32,12 @@ get_cell_for_organization, ) from sentry.utils import metrics -from sentry.utils.circuit_breaker2 import CircuitBreaker, CountBasedTripStrategy -from sentry.utils.http import BodyWithLength +from sentry.utils.http import BodyAsyncWrapper logger = logging.getLogger(__name__) +proxy_client = httpx.AsyncClient() + # Endpoints that handle uploaded files have higher timeouts configured # and we need to honor those timeouts when proxying. # See frontend/templates/sites-enabled/sentry.io in getsentry/ops @@ -55,21 +56,28 @@ PROXY_CHUNK_SIZE = 512 * 1024 -def _parse_response(response: ExternalResponse, remote_url: str) -> StreamingHttpResponse: - """ - Convert the Responses class from requests into the drf Response - """ +async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]: + """Yield chunks from an httpx response and close the connection when done.""" + try: + async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE): + yield chunk + finally: + await response.aclose() + + +def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: + """Convert an httpx Response into a Django response.""" - def stream_response() -> Generator[bytes]: - yield from response.iter_content(PROXY_CHUNK_SIZE) + new_headers = clean_outbound_headers(response.headers) + content_type = new_headers.pop("Content-Type", None) streamed_response = StreamingHttpResponse( - streaming_content=stream_response(), + streaming_content=_stream_response_and_close(response), status=response.status_code, - content_type=response.headers.pop("Content-Type", None), + content_type=content_type, ) - # Add Headers to response - for header, value in response.headers.items(): + + for header, value in new_headers.items(): if not is_hop_by_hop(header): streamed_response[header] = value @@ -77,19 +85,28 @@ def stream_response() -> Generator[bytes]: return streamed_response -def proxy_request(request: HttpRequest, org_id_or_slug: str, url_name: str) -> HttpResponseBase: +async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]: + async for chunk in body: + yield chunk + + +async def proxy_request( + request: HttpRequest, + org_id_or_slug: str, + url_name: str, +) -> HttpResponseBase: """Take a django request object and proxy it to a remote location given an org_id_or_slug""" try: - cell = get_cell_for_organization(org_id_or_slug) + cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug) except CellResolutionError as e: logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)}) return HttpResponse(status=404) - return proxy_cell_request(request, cell, url_name) + return await proxy_cell_request(request, cell, url_name) -def proxy_error_embed_request( +async def proxy_error_embed_request( request: HttpRequest, dsn: str, url_name: str ) -> HttpResponseBase | None: try: @@ -109,7 +126,7 @@ def proxy_error_embed_request( # If we don't have a o123.ingest.{cell}.{app_host} style domain # we forward to the monolith cell cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) - return proxy_cell_request(request, cell, url_name) + return await proxy_cell_request(request, cell, url_name) try: cell_offset = len(app_segments) + 1 cell_segment = host_segments[cell_offset * -1] @@ -117,100 +134,50 @@ def proxy_error_embed_request( except Exception: return None - return proxy_cell_request(request, cell, url_name) + return await proxy_cell_request(request, cell, url_name) -def proxy_cell_request(request: HttpRequest, cell: Cell, url_name: str) -> HttpResponseBase: +async def proxy_cell_request( + request: HttpRequest, + cell: Cell, + url_name: str, +) -> StreamingHttpResponse: """Take a django request object and proxy it to a cell silo""" - - metric_tags = {"region": cell.name, "url_name": url_name} - circuit_breaker: CircuitBreaker | None = None - # TODO(mark) remove rollout options - if options.get("apigateway.proxy.circuit-breaker.enabled"): - try: - circuit_breaker = CircuitBreaker( - key=f"apigateway.proxy.{cell.name}", - config=options.get("apigateway.proxy.circuit-breaker.config"), - trip_strategy=CountBasedTripStrategy.from_config( - options.get("apigateway.proxy.circuit-breaker.config") - ), - ) - except Exception as e: - logger.warning("apigateway.invalid-breaker-config", extra={"message": str(e)}) - - if circuit_breaker is not None: - if not circuit_breaker.should_allow_request(): - metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags) - if options.get("apigateway.proxy.circuit-breaker.enforce"): - body = { - "error": "apigateway", - "detail": "Downstream service temporarily unavailable", - } - return JsonResponse(body, status=503) - target_url = urljoin(cell.address, request.path) content_encoding = request.headers.get("Content-Encoding") header_dict = clean_proxy_headers(request.headers) header_dict[PROXY_APIGATEWAY_HEADER] = "true" - # TODO: use requests session for connection pooling capabilities assert request.method is not None query_params = request.GET - # This option has a default of None, which is cast to 0 - timeout = options.get("apigateway.proxy.timeout") - if not timeout: - timeout = settings.GATEWAY_PROXY_TIMEOUT - timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, timeout) + timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) + metric_tags = {"region": cell.name, "url_name": url_name} # XXX: See sentry.testutils.pytest.sentry for more information if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) - data: bytes | Generator[bytes] | ChunkedEncodingDecoder | BodyWithLength | None = None if url_name == "sentry-api-0-organization-objectstore": if content_encoding: header_dict["Content-Encoding"] = content_encoding - data = get_raw_body(request) + data = get_raw_body_async(request) else: - data = BodyWithLength(request) + data = BodyAsyncWrapper(request.body) try: with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): - resp = external_request( + req = proxy_client.build_request( request.method, - url=target_url, + target_url, headers=header_dict, params=dict(query_params) if query_params is not None else None, - data=data, - stream=True, + content=_stream_request(data), # type: ignore[arg-type] timeout=timeout, - # By default, external_request will resolve any redirects for any verb except for HEAD. - # We explicitly disable this behavior to avoid misrepresenting the original sentry.io request with the - # body response of the redirect. - allow_redirects=False, ) - except Timeout: - metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) - if circuit_breaker is not None: - circuit_breaker.record_error() - + resp = await proxy_client.send(req, stream=True, follow_redirects=False) + return _adapt_response(resp, target_url) + except (httpx.TimeoutException, asyncio.CancelledError): # remote silo timeout. Use DRF timeout instead raise RequestTimeout() - except ConnectionError: - metrics.incr("apigateway.proxy.connection_error", tags=metric_tags) - if circuit_breaker is not None: - circuit_breaker.record_error() - - raise - - if resp.status_code >= 502: - metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) - if circuit_breaker is not None: - circuit_breaker.record_error() - - new_headers = clean_outbound_headers(resp.headers) - resp.headers.clear() - resp.headers.update(new_headers) - return _parse_response(resp, target_url) diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index c6ed04945011c7..eab7e268877c02 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -6,12 +6,13 @@ from wsgiref.util import is_hop_by_hop import requests +from asgiref.sync import sync_to_async from django.http import HttpRequest, StreamingHttpResponse from requests import Response as ExternalResponse from rest_framework.request import Request from rest_framework.response import Response -from sentry.utils.http import BodyWithLength +from sentry.utils.http import BodyAsyncWrapper, BodyWithLength, BodyWithLengthAiter # TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian uwsgi: Any = None @@ -150,6 +151,32 @@ def stream_generator(): return BodyWithLength(request) +def get_raw_body_async( + request: HttpRequest, +) -> BodyAsyncWrapper | ChunkedEncodingAsyncDecoder | BodyWithLengthAiter | None: + if request.body: + return BodyAsyncWrapper(request.body) + + wsgi_input = request.META.get("wsgi.input") + if "granian" in request.META.get("SERVER_SOFTWARE", "").lower(): + return BodyAsyncWrapper(wsgi_input) + + # wsgiref will raise an exception and hang when attempting to read wsgi.input while there's no body. + # For now, support bodies only on PUT and POST requests when not using Granian. + if request.method not in ("PUT", "POST"): + return None + + # wsgiref (dev/test server) + if ( + hasattr(wsgi_input, "_read") + and request.headers.get("Transfer-Encoding", "").lower() == "chunked" + ): + return ChunkedEncodingAsyncDecoder(wsgi_input._read) # type: ignore[union-attr] + + # wsgiref and the request has been already proxied through control silo + return BodyWithLength(request).__aiter__() + + def get_target_url(path: str) -> str: base = options.get("objectstore.config")["base_url"].rstrip("/") # `path` should be a relative path, only grab that part @@ -248,3 +275,13 @@ def read(self, size: int = -1) -> bytes: raise ValueError("Malformed chunk encoded stream") return b"".join(buffer) + + +class ChunkedEncodingAsyncDecoder(ChunkedEncodingDecoder): + def __aiter__(self): + return self + + async def __anext__(self) -> bytes: + if self._done: + raise StopAsyncIteration + return await sync_to_async(self.read)() diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index ba9795f7c67f47..3b5397da640f0f 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -474,6 +474,7 @@ def devserver( "SENTRY_CONTROL_SILO_PORT": server_port, "SENTRY_REGION_SILO_PORT": str(ports["region.server"]), "SENTRY_DEVSERVER_BIND": f"127.0.0.1:{server_port}", + "SENTRY_GRANIAN_IFACE": "asginl", "SENTRY_GRANIAN_PORT": str(ports["server"]), "SENTRY_GRANIAN_WORKERS": "2", } diff --git a/src/sentry/services/http.py b/src/sentry/services/http.py index 9678945b00c4a8..f515a25a28a1b2 100644 --- a/src/sentry/services/http.py +++ b/src/sentry/services/http.py @@ -5,7 +5,6 @@ from typing import Any from granian import Granian -from granian.constants import Interfaces as GranianInterfaces from sentry.services.base import Service @@ -15,7 +14,7 @@ def _run_server(options: dict[str, Any]): target=options["module"], address=options["host"], port=options["port"], - interface=GranianInterfaces.WSGI, + interface=options["iface"], workers=options["workers"], backlog=options["backlog"], workers_kill_timeout=options["workers-kill-timeout"], @@ -51,6 +50,7 @@ def __init__( host = host or settings.SENTRY_WEB_HOST port = port or int(os.environ.get("SENTRY_GRANIAN_PORT", "0")) or settings.SENTRY_WEB_PORT workers = workers or int(os.environ.get("SENTRY_GRANIAN_WORKERS", "1")) + iface = os.environ.get("SENTRY_GRANIAN_IFACE", "wsgi") reload = bool(os.environ.get("SENTRY_GRANIAN_RELOAD")) options = (settings.SENTRY_WEB_OPTIONS or {}).copy() @@ -58,7 +58,8 @@ def __init__( for k, v in extra_options.items(): options[k] = v - options.setdefault("module", "sentry.wsgi:application") + options.setdefault("module", f"sentry.{iface[:4]}:application") + options.setdefault("iface", iface) options.setdefault("host", host) options.setdefault("port", port) options.setdefault("workers", workers) diff --git a/src/sentry/testutils/helpers/apigateway.py b/src/sentry/testutils/helpers/apigateway.py index 08e3c4ac4bf6f1..0c6a857c10a1d1 100644 --- a/src/sentry/testutils/helpers/apigateway.py +++ b/src/sentry/testutils/helpers/apigateway.py @@ -1,8 +1,12 @@ from __future__ import annotations +from collections.abc import Callable +from contextlib import contextmanager +from typing import Any +from unittest.mock import patch from urllib.parse import parse_qs -import responses +import httpx from django.conf import settings from django.http import HttpResponseRedirect from django.test import override_settings @@ -75,14 +79,98 @@ def get(self, request: Request) -> Response: ] + api_urls.urlpatterns +# Type for httpx mock callback: receives httpx.Request, returns (status, headers, body) +HttpxCallback = Callable[[httpx.Request], tuple[int, dict[str, str], str | bytes]] + + +class HttpxMockRouter: + """Mock HTTP router for httpx, replacing the `responses` library for async proxy tests.""" + + def __init__(self) -> None: + self._routes: list[dict[str, Any]] = [] + + def add( + self, + method: str, + url: str, + body: str | bytes = b"", + status_code: int = 200, + headers: dict[str, str] | None = None, + json_data: Any | None = None, + content_type: str | None = None, + ) -> None: + if json_data is not None: + body = json.dumps(json_data).encode() + content_type = content_type or "application/json" + elif isinstance(body, str): + body = body.encode() + resp_headers = dict(headers or {}) + if content_type: + resp_headers["Content-Type"] = content_type + self._routes.append( + { + "method": method.upper(), + "url": url, + "body": body, + "status_code": status_code, + "headers": resp_headers, + } + ) + + def add_callback(self, method: str, url: str, callback: HttpxCallback) -> None: + self._routes.append( + { + "method": method.upper(), + "url": url, + "callback": callback, + } + ) + + def handler(self, request: httpx.Request) -> httpx.Response: + url_str = str(request.url) + # Strip query params for matching + url_path = url_str.split("?")[0] + for route in self._routes: + if request.method != route["method"]: + continue + route_url = route["url"].split("?")[0] + if url_path != route_url: + continue + + if "callback" in route: + status_code, headers, body = route["callback"](request) + if isinstance(body, str): + body = body.encode() + return httpx.Response( + status_code, headers=dict(headers), content=body, request=request + ) + else: + return httpx.Response( + route["status_code"], + headers=route["headers"], + content=route["body"], + request=request, + ) + + raise ValueError(f"No mock route matched: {request.method} {url_str}") + + +@contextmanager +def mock_proxy_client(router: HttpxMockRouter): + """Patch the proxy_client with a mock httpx.AsyncClient using the given router.""" + mock_client = httpx.AsyncClient(transport=httpx.MockTransport(router.handler)) + with patch("sentry.hybridcloud.apigateway.proxy.proxy_client", mock_client): + yield mock_client + + def verify_request_body(body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): if request.headers.get("content-type") == "application/json": - assert json.load(request.body) == body + assert json.loads(request.content) == body else: - assert request.body.read() == body + assert request.content == (body if isinstance(body, bytes) else body.encode()) assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -90,9 +178,9 @@ def request_callback(request): def verify_request_headers(headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): + def request_callback(request: httpx.Request): assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -100,10 +188,10 @@ def request_callback(request): def verify_request_params(params, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - request_params = parse_qs(request.url.split("?")[1]) + def request_callback(request: httpx.Request): + request_params = parse_qs(str(request.url).split("?")[1]) assert (request.headers[key] == headers[key] for key in headers) for key in params: assert key in request_params @@ -117,10 +205,10 @@ def request_callback(request): def verify_file_body(file_body, headers): - """Wrapper for a callback function for responses.add_callback""" + """Wrapper for a callback function for HttpxMockRouter.add_callback.""" - def request_callback(request): - assert file_body in request.body.read() + def request_callback(request: httpx.Request): + assert file_body in request.content assert (request.headers[key] == headers[key] for key in headers) return 200, {}, json.dumps({"proxy": True}) @@ -148,34 +236,42 @@ class ApiGatewayTestCase(APITestCase): def setUp(self): super().setUp() - responses.add( - responses.GET, + self.httpx_router = HttpxMockRouter() + self.httpx_router.add( + "GET", f"{self.CELL.address}/get", body=json.dumps({"proxy": True}), content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/error", body=json.dumps({"proxy": True}), - status=400, + status_code=400, content_type="application/json", - adding_headers={"test": "header"}, + headers={"test": "header"}, ) self.organization = self.create_organization(region=self.CELL) # Echos the request body and header back for verification - def return_request_body(request): - return (200, request.headers, request.body) + def return_request_body(request: httpx.Request): + return (200, dict(request.headers), request.content) # Echos the query params and header back for verification - def return_request_params(request): - params = parse_qs(request.url.split("?")[1]) - return (200, request.headers, json.dumps(params).encode()) + def return_request_params(request: httpx.Request): + params = parse_qs(str(request.url).split("?")[1]) + return (200, dict(request.headers), json.dumps(params).encode()) - responses.add_callback(responses.GET, f"{self.CELL.address}/echo", return_request_params) - responses.add_callback(responses.POST, f"{self.CELL.address}/echo", return_request_body) + self.httpx_router.add_callback("GET", f"{self.CELL.address}/echo", return_request_params) + self.httpx_router.add_callback("POST", f"{self.CELL.address}/echo", return_request_body) self.middleware = provision_middleware() + # Enter the mock proxy client context for the duration of the test + self._mock_proxy_ctx = mock_proxy_client(self.httpx_router) + self._mock_proxy_ctx.__enter__() + + def tearDown(self): + self._mock_proxy_ctx.__exit__(None, None, None) + super().tearDown() diff --git a/src/sentry/testutils/helpers/response.py b/src/sentry/testutils/helpers/response.py index d6681e93c6f884..036d595387182a 100644 --- a/src/sentry/testutils/helpers/response.py +++ b/src/sentry/testutils/helpers/response.py @@ -2,6 +2,7 @@ from typing import TypeGuard +from asgiref.sync import async_to_sync from django.http.response import HttpResponseBase, StreamingHttpResponse from rest_framework.response import Response @@ -16,11 +17,20 @@ def is_streaming_response(response: HttpResponseBase) -> TypeGuard[StreamingHttp return isinstance(response, StreamingHttpResponse) +async def _async_streaming_response_content(response: StreamingHttpResponse): + data = [] + async for chunk in response: + data.append(chunk) + return b"".join(data) + + def close_streaming_response(response: HttpResponseBase) -> bytes: """Exhausts the streamed file in a response. - When the file is exahusted, this underlying file descriptor is closed + When the file is exhausted, the underlying file descriptor is closed avoiding a `ResourceWarning`. """ assert isinstance(response, StreamingHttpResponse) + if response.is_async: + return async_to_sync(_async_streaming_response_content)(response) return response.getvalue() diff --git a/src/sentry/utils/http.py b/src/sentry/utils/http.py index 8cf3f48dca9159..ae5724c8b502b9 100644 --- a/src/sentry/utils/http.py +++ b/src/sentry/utils/http.py @@ -1,9 +1,10 @@ from __future__ import annotations from collections.abc import Collection, Iterator -from typing import TYPE_CHECKING, NamedTuple, TypeGuard, overload +from typing import TYPE_CHECKING, Any, NamedTuple, TypeGuard, overload from urllib.parse import quote, urljoin, urlparse +from asgiref.sync import sync_to_async from django.conf import settings from django.http import HttpRequest @@ -226,6 +227,28 @@ def is_using_customer_domain(request: HttpRequest) -> TypeGuard[_HttpRequestWith return bool(hasattr(request, "subdomain") and request.subdomain) +class BodyAsyncWrapper: + def __init__(self, body: Any): + self.body = [body] if isinstance(body, bytes) else body + + def __aiter__(self): + return BodyAsyncIter(self) + + +class BodyAsyncIter: + def __init__(self, parent: BodyAsyncWrapper): + self.biter = iter(parent.body) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() + + class BodyWithLength: """Wraps an HttpRequest with a __len__ so that the requests library does not assume length=0 in all cases""" @@ -235,8 +258,25 @@ def __init__(self, request: HttpRequest): def __iter__(self) -> Iterator[bytes]: return iter(self.request) + def __aiter__(self) -> BodyWithLengthAiter: + return BodyWithLengthAiter(self) + def __len__(self) -> int: return int(self.request.headers.get("Content-Length", "0")) def read(self, size: int | None = None) -> bytes: return self.request.read(size) + + +class BodyWithLengthAiter: + def __init__(self, parent: BodyWithLength): + self.biter = iter(parent.request) + + def _anext(self): + try: + return self.biter.__next__() + except StopIteration: + raise StopAsyncIteration + + async def __anext__(self) -> bytes: + return await sync_to_async(self._anext)() diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway.py b/tests/sentry/hybridcloud/apigateway/test_apigateway.py index 723305c5ab8568..f59eedc88d2fde 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway.py @@ -1,7 +1,6 @@ from urllib.parse import urlencode import pytest -import responses from django.conf import settings from django.test import override_settings from django.urls import get_resolver, reverse @@ -16,12 +15,11 @@ @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ApiGatewayTest(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: query_params = dict(foo="test", bar=["one", "two"]) headers = dict(example="this") - responses.add_callback( - responses.GET, + self.httpx_router.add_callback( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", verify_request_params(query_params, headers), ) @@ -40,13 +38,12 @@ def test_simple(self) -> None: resp_json = json.loads(close_streaming_response(resp)) assert resp_json["proxy"] is True - @responses.activate def test_proxy_does_not_resolve_redirect(self) -> None: - responses.add( - responses.POST, + self.httpx_router.add( + "POST", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", headers={"Location": "https://zombo.com"}, - status=302, + status_code=302, ) url = reverse("region-endpoint", kwargs={"organization_slug": self.organization.slug}) @@ -61,7 +58,6 @@ def test_proxy_does_not_resolve_redirect(self) -> None: response_payload = close_streaming_response(resp) assert response_payload == b"" - @responses.activate def test_cell_pinned_urls_are_defined(self) -> None: resolver = get_resolver() # Ensure that all urls in REGION_PINNED_URL_NAMES exist in api/urls.py @@ -73,18 +69,17 @@ def test_cell_pinned_urls_are_defined(self) -> None: f"REGION_PINNED_URL_NAMES contains {name}, but no route is registered with that name" ) - @responses.activate def test_proxy_check_org_slug_url(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url = reverse( @@ -109,28 +104,27 @@ def test_proxy_check_org_slug_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: """Test the logic of when a request should be proxied""" - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.slug}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/region/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/organizations/{self.organization.id}/control/", - json={"proxy": True}, + json_data={"proxy": True}, ) region_url_slug = reverse( @@ -178,13 +172,12 @@ def test_proxy_check_org_id_or_slug_url_with_params(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_region_pinned_url(self) -> None: project_key = self.create_project_key(self.project) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/js-sdk-loader/{project_key.public_key}.js", - json={"proxy": True}, + json_data={"proxy": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -205,17 +198,16 @@ def test_proxy_check_region_pinned_url(self) -> None: assert resp.status_code == 200 assert resp.data["proxy"] is False - @responses.activate def test_proxy_check_cell_pinned_url_with_params(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/register/", - json={"proxy": True}, + json_data={"proxy": True}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/relays/abc123/", - json={"proxy": True, "details": True}, + json_data={"proxy": True, "details": True}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): @@ -230,18 +222,17 @@ def test_proxy_check_cell_pinned_url_with_params(self) -> None: assert resp_json["proxy"] is True assert resp_json["details"] is True - @responses.activate def test_proxy_check_cell_pinned_issue_urls(self) -> None: issue = self.create_group() - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/", - json={"proxy": True, "id": issue.id}, + json_data={"proxy": True, "id": issue.id}, ) - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/issues/{issue.id}/events/", - json={"proxy": True, "id": issue.id, "events": True}, + json_data={"proxy": True, "id": issue.id, "events": True}, ) # No /api/0 as we only include sentry.api.urls.urlpatterns @@ -262,12 +253,11 @@ def test_proxy_check_cell_pinned_issue_urls(self) -> None: assert resp_json["proxy"] is True assert resp_json["events"] - @responses.activate def test_proxy_error_embed_dsn(self) -> None: - responses.add( - responses.GET, + self.httpx_router.add( + "GET", f"{self.CELL.address}/api/embed/error-page/", - json={"proxy": True, "name": "error-embed"}, + json_data={"proxy": True, "name": "error-embed"}, ) with override_settings(SILO_MODE=SiloMode.CONTROL, MIDDLEWARE=tuple(self.middleware)): # no dsn diff --git a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py index f07005d6d73707..2064d9863d822d 100644 --- a/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py +++ b/tests/sentry/hybridcloud/apigateway/test_apigateway_helpers.py @@ -1,7 +1,4 @@ -from io import BytesIO - -import requests -import responses +import httpx from sentry.testutils.helpers.apigateway import ApiGatewayTestCase, verify_request_body from sentry.testutils.silo import no_silo_test @@ -10,14 +7,16 @@ @no_silo_test(cells=[ApiGatewayTestCase.CELL]) class VerifyRequestBodyTest(ApiGatewayTestCase): - @responses.activate def test_verify_request_body(self) -> None: body = {"ab": "cd"} headers = {"header": "nope", "content-type": "application/json"} - responses.add_callback( - responses.POST, "http://ab.cd.e/test", verify_request_body(body, headers) - ) - resp = requests.post( - "http://ab.cd.e/test", data=BytesIO(json.dumps(body).encode("utf8")), headers=headers + callback = verify_request_body(body, headers) + + mock_request = httpx.Request( + "POST", + "http://ab.cd.e/test", + headers=headers, + content=json.dumps(body).encode("utf8"), ) - assert resp.status_code == 200 + status_code, resp_headers, resp_body = callback(mock_request) + assert status_code == 200 diff --git a/tests/sentry/hybridcloud/apigateway/test_proxy.py b/tests/sentry/hybridcloud/apigateway/test_proxy.py index c8a66042dfc74d..6a44192f5111f8 100644 --- a/tests/sentry/hybridcloud/apigateway/test_proxy.py +++ b/tests/sentry/hybridcloud/apigateway/test_proxy.py @@ -1,22 +1,16 @@ -from unittest.mock import MagicMock, patch from urllib.parse import urlencode -import pytest -import requests -import responses +import httpx +from asgiref.sync import async_to_sync from django.core.files.uploadedfile import SimpleUploadedFile -from django.http import HttpResponse from django.test.client import RequestFactory -from requests.exceptions import ConnectionError, Timeout -from sentry.api.exceptions import RequestTimeout -from sentry.hybridcloud.apigateway.proxy import proxy_request +from sentry.hybridcloud.apigateway.proxy import proxy_request as _proxy_request from sentry.silo.util import ( INVALID_OUTBOUND_HEADERS, PROXY_APIGATEWAY_HEADER, PROXY_DIRECT_LOCATION_HEADER, ) -from sentry.testutils.helpers import override_options from sentry.testutils.helpers.apigateway import ( ApiGatewayTestCase, verify_file_body, @@ -27,12 +21,12 @@ from sentry.testutils.silo import control_silo_test from sentry.utils import json +proxy_request = async_to_sync(_proxy_request) url_name = "sentry-api-0-projets" @control_silo_test(cells=[ApiGatewayTestCase.CELL], include_monolith_run=True) class ProxyTestCase(ApiGatewayTestCase): - @responses.activate def test_simple(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, self.organization.slug, url_name) @@ -54,7 +48,6 @@ def test_simple(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/error" - @responses.activate def test_query_params(self) -> None: query_param_dict = dict(foo="bar", numlist=["1", "2", "3"]) query_param_str = urlencode(query_param_dict, doseq=True) @@ -68,17 +61,15 @@ def test_query_params(self) -> None: assert query_param_dict["foo"] == resp_json["foo"][0] assert query_param_dict["numlist"] == resp_json["numlist"] - @responses.activate def test_bad_org(self) -> None: request = RequestFactory().get("http://sentry.io/get") resp = proxy_request(request, "doesnotexist", url_name) assert resp.status_code == 404 - @responses.activate def test_post(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -94,11 +85,10 @@ def test_post(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/post" - @responses.activate def test_put(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PUT, + self.httpx_router.add_callback( + "PUT", "http://us.internal.sentry.io/put", verify_request_body(request_body, {"test": "header"}), ) @@ -114,11 +104,10 @@ def test_put(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/put" - @responses.activate def test_patch(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.PATCH, + self.httpx_router.add_callback( + "PATCH", "http://us.internal.sentry.io/patch", verify_request_body(request_body, {"test": "header"}), ) @@ -134,11 +123,10 @@ def test_patch(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/patch" - @responses.activate def test_head(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.HEAD, + self.httpx_router.add_callback( + "HEAD", "http://us.internal.sentry.io/head", verify_request_headers({"test": "header"}), ) @@ -154,11 +142,10 @@ def test_head(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/head" - @responses.activate def test_delete(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.DELETE, + self.httpx_router.add_callback( + "DELETE", "http://us.internal.sentry.io/delete", verify_request_body(request_body, {"test": "header"}), ) @@ -174,7 +161,6 @@ def test_delete(self) -> None: assert resp.has_header(PROXY_DIRECT_LOCATION_HEADER) assert resp[PROXY_DIRECT_LOCATION_HEADER] == "http://us.internal.sentry.io/delete" - @responses.activate def test_file_upload(self) -> None: foo = dict(test="a", file="b", what="c") contents = json.dumps(foo).encode() @@ -183,8 +169,8 @@ def test_file_upload(self) -> None: "foo": "bar", } - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_file_body(contents, {"test": "header"}), ) @@ -197,14 +183,13 @@ def test_file_upload(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_alternate_content_type(self) -> None: # Check form encoded files also work foo = dict(test="a", file="b", what="c") contents = urlencode(foo, doseq=True).encode("utf-8") request_body = contents - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(contents, {"test": "header"}), ) @@ -219,16 +204,15 @@ def test_alternate_content_type(self) -> None: assert resp.status_code == 200 assert resp_json["proxy"] - @responses.activate def test_apply_apigateway_proxy_header(self) -> None: - def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, str], str]: + def request_callback(request: httpx.Request) -> tuple[int, dict[str, str], str]: assert request.headers.get(PROXY_APIGATEWAY_HEADER), ( "Proxied requests should have a header added" ) return 200, {"proxied": "yes"}, json.dumps({"success": True}) - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", request_callback, ) @@ -243,11 +227,10 @@ def request_callback(request: requests.PreparedRequest) -> tuple[int, dict[str, assert resp.status_code == 200 assert resp["proxied"] == "yes" - @responses.activate def test_strip_request_headers(self) -> None: request_body = {"foo": "bar", "nested": {"int_list": [1, 2, 3]}} - responses.add_callback( - responses.POST, + self.httpx_router.add_callback( + "POST", "http://us.internal.sentry.io/post", verify_request_body(request_body, {"test": "header"}), ) @@ -267,176 +250,3 @@ def test_strip_request_headers(self) -> None: resp = proxy_request(request, self.organization.slug, url_name) assert not any([header in resp for header in INVALID_OUTBOUND_HEADERS]) - - -CB_ENABLED = { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": True, -} - - -@control_silo_test(cells=[ApiGatewayTestCase.CELL]) -class ProxyCircuitBreakerTestCase(ApiGatewayTestCase): - def _make_breaker_mock(self, *, allow_request: bool) -> MagicMock: - mock_breaker = MagicMock() - mock_breaker.should_allow_request.return_value = allow_request - return mock_breaker - - @responses.activate - @override_options(CB_ENABLED) - def test_open_circuit_returns_503(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert isinstance(resp, HttpResponse) - assert resp.status_code == 503 - assert json.loads(resp.content) == { - "error": "apigateway", - "detail": "Downstream service temporarily unavailable", - } - - @responses.activate - @override_options(CB_ENABLED) - def test_circuit_breaker_keyed_per_cell(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - key_used = mock_breaker_class.call_args.kwargs["key"] - assert key_used == f"apigateway.proxy.{self.CELL.name}" - - @responses.activate - def test_circuit_breaker_disabled_by_default(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - request = RequestFactory().get("http://sentry.io/get") - proxy_request(request, self.organization.slug, url_name) - mock_breaker_class.assert_not_called() - - @responses.activate - @override_options( - { - "apigateway.proxy.circuit-breaker.enabled": True, - "apigateway.proxy.circuit-breaker.enforce": False, - } - ) - def test_open_circuit_not_enforced(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker_class.return_value = self._make_breaker_mock(allow_request=False) - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - - @responses.activate - @override_options({"apigateway.proxy.circuit-breaker.config": "invalid-lol", **CB_ENABLED}) - def test_handles_invalid_config(self) -> None: - request = RequestFactory().get("http://sentry.io/get") - res = proxy_request(request, self.organization.slug, url_name) - assert res.status_code == 200 - - @responses.activate - @override_options(CB_ENABLED) - def test_timeout_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/timeout", - body=Timeout(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/timeout") - with pytest.raises(RequestTimeout): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_connection_error_records_metric(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/connect-error", - body=ConnectionError(), - ) - with patch("sentry.hybridcloud.apigateway.proxy.metrics") as mock_metrics: - request = RequestFactory().get("http://sentry.io/connect-error") - with pytest.raises(ConnectionError): - proxy_request(request, self.organization.slug, url_name) - mock_metrics.incr.assert_any_call( - "apigateway.proxy.connection_error", - tags={"region": self.CELL.name, "url_name": url_name}, - ) - - @responses.activate - @override_options(CB_ENABLED) - def test_504_response_does_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=504, - body=json.dumps({"detail": "gateway timeout"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 504 - mock_breaker.record_error.assert_called_once() - - @responses.activate - @override_options(CB_ENABLED) - def test_500_response_does_not_record_error(self) -> None: - responses.add( - responses.GET, - f"{self.CELL.address}/server-error", - status=500, - body=json.dumps({"detail": "internal server error"}), - content_type="application/json", - ) - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/server-error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 500 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_4xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/error") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 400 - mock_breaker.record_error.assert_not_called() - - @responses.activate - @override_options(CB_ENABLED) - def test_2xx_response_does_not_record_error(self) -> None: - with patch("sentry.hybridcloud.apigateway.proxy.CircuitBreaker") as mock_breaker_class: - mock_breaker = self._make_breaker_mock(allow_request=True) - mock_breaker_class.return_value = mock_breaker - request = RequestFactory().get("http://sentry.io/get") - resp = proxy_request(request, self.organization.slug, url_name) - assert resp.status_code == 200 - mock_breaker.record_error.assert_not_called() diff --git a/uv.lock b/uv.lock index 32dabf01919c03..f8218bac57bd66 100644 --- a/uv.lock +++ b/uv.lock @@ -949,17 +949,16 @@ http2 = [ [[package]] name = "httpx" -version = "0.25.2" +version = "0.28.1" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "httpcore", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "idna", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.25.2-py3-none-any.whl", hash = "sha256:a05d3d052d9b2dfce0e3896636467f8a5342fb2b902c819428e1ac65413ca118" }, + { url = "https://pypi.devinfra.sentry.io/wheels/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad" }, ] [[package]] @@ -2153,6 +2152,7 @@ dependencies = [ { name = "grpcio", version = "1.67.0", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux')" }, { name = "grpcio", version = "1.75.1", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version >= '3.14' and sys_platform == 'linux')" }, { name = "hiredis", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "iso3166", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "jsonschema", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "lxml", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2320,6 +2320,7 @@ requires-dist = [ { name = "grpc-google-iam-v1", specifier = ">=0.13.1" }, { name = "grpcio", specifier = ">=1.67.0" }, { name = "hiredis", specifier = ">=2.3.2" }, + { name = "httpx", specifier = ">=0.28.1" }, { name = "iso3166", specifier = ">=2.1.1" }, { name = "jsonschema", specifier = ">=4.20.0" }, { name = "lxml", specifier = ">=5.3.0" }, From b6e74e3ef881244d0c27a4f780435545cc2d2565 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Tue, 24 Mar 2026 13:56:45 +0100 Subject: [PATCH 02/16] chore: change `SingleProcessSiloModeState` to use `contextvars` in place of `threading.local` --- .../objectstore/endpoints/organization.py | 2 +- src/sentry/silo/base.py | 3 +- src/sentry/testutils/helpers/response.py | 2 +- src/sentry/testutils/silo.py | 39 +++++----- .../endpoints/test_organization.py | 71 +++++++++++++++---- 5 files changed, 77 insertions(+), 40 deletions(-) diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index eab7e268877c02..e770110e1a9222 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -164,7 +164,7 @@ def get_raw_body_async( # wsgiref will raise an exception and hang when attempting to read wsgi.input while there's no body. # For now, support bodies only on PUT and POST requests when not using Granian. if request.method not in ("PUT", "POST"): - return None + return BodyAsyncWrapper(b"") # wsgiref (dev/test server) if ( diff --git a/src/sentry/silo/base.py b/src/sentry/silo/base.py index 91dade5a9e59d0..cccbf81eadcb2a 100644 --- a/src/sentry/silo/base.py +++ b/src/sentry/silo/base.py @@ -3,7 +3,6 @@ import abc import contextlib import functools -import threading import typing from collections.abc import Callable, Generator, Iterable from enum import Enum @@ -56,7 +55,7 @@ def get_current_mode(cls) -> SiloMode: return SingleProcessSiloModeState.get_mode() or process_level_silo_mode -class SingleProcessSiloModeState(threading.local): +class SingleProcessSiloModeState: """ Used by silo endpoint decorators and other contexts that help 'suggest' to acceptance testing and local single process silo testing which 'silo context' the diff --git a/src/sentry/testutils/helpers/response.py b/src/sentry/testutils/helpers/response.py index 036d595387182a..b1ae6a6521d58c 100644 --- a/src/sentry/testutils/helpers/response.py +++ b/src/sentry/testutils/helpers/response.py @@ -17,7 +17,7 @@ def is_streaming_response(response: HttpResponseBase) -> TypeGuard[StreamingHttp return isinstance(response, StreamingHttpResponse) -async def _async_streaming_response_content(response: StreamingHttpResponse): +async def _async_streaming_response_content(response: StreamingHttpResponse) -> bytes: data = [] async for chunk in response: data.append(chunk) diff --git a/src/sentry/testutils/silo.py b/src/sentry/testutils/silo.py index 7a93e28c3e660c..20d5eaaa02bbe7 100644 --- a/src/sentry/testutils/silo.py +++ b/src/sentry/testutils/silo.py @@ -1,12 +1,12 @@ from __future__ import annotations import contextlib +import contextvars import functools import inspect import os import re import sys -import threading import typing from collections.abc import Callable, Collection, Generator, Iterable, Mapping, MutableSet, Sequence from contextlib import contextmanager, nullcontext @@ -35,46 +35,43 @@ def monkey_patch_single_process_silo_mode_state(): - class LocalSiloModeState(threading.local): - mode: SiloMode | None = None - cell: Cell | None = None - - state = LocalSiloModeState() + _silo_mode_var: contextvars.ContextVar[SiloMode | None] = contextvars.ContextVar( + "silo_mode", default=None + ) + _silo_cell_var: contextvars.ContextVar[Cell | None] = contextvars.ContextVar( + "silo_cell", default=None + ) @contextlib.contextmanager def enter(mode: SiloMode, cell: Cell | None = None) -> Generator[None]: - assert state.mode is None, ( + assert _silo_mode_var.get() is None, ( "Re-entrant invariant broken! Use exit_single_process_silo_context " "to explicit pass 'fake' RPC boundaries." ) - old_mode = state.mode - old_cell = state.cell - state.mode = mode - state.cell = cell + mode_token = _silo_mode_var.set(mode) + cell_token = _silo_cell_var.set(cell) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) @contextlib.contextmanager def exit() -> Generator[None]: - old_mode = state.mode - old_cell = state.cell - state.mode = None - state.cell = None + mode_token = _silo_mode_var.set(None) + cell_token = _silo_cell_var.set(None) try: yield finally: - state.mode = old_mode - state.cell = old_cell + _silo_mode_var.reset(mode_token) + _silo_cell_var.reset(cell_token) def get_mode() -> SiloMode | None: - return state.mode + return _silo_mode_var.get() def get_cell() -> Cell | None: - return state.cell + return _silo_cell_var.get() SingleProcessSiloModeState.enter = staticmethod(enter) # type: ignore[method-assign] SingleProcessSiloModeState.exit = staticmethod(exit) # type: ignore[method-assign] diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index c342b9c1e4c5ad..abe4032c5bbbd9 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -1,14 +1,18 @@ import os from dataclasses import asdict +from unittest.mock import patch +import httpx import pytest import requests import zstandard from django.db import connections +from django.http import HttpResponse, StreamingHttpResponse from django.urls import reverse from objectstore_client import Client, RequestError, Session, Usecase from pytest_django.live_server_helper import LiveServer +from sentry.hybridcloud.apigateway import proxy as proxy_mod from sentry.silo.base import SiloMode, SingleProcessSiloModeState from sentry.testutils.asserts import assert_status_code from sentry.testutils.cases import TransactionTestCase @@ -180,7 +184,53 @@ def setUp(self) -> None: scope_list=["project:releases"], ) + #: some shenanigans to work around async/sync hell: + # - use a "one shot" httpx client, so that we're not bound previous + # no-more existing event loops + # - patch the middleware to consume original streamed response body + # before the loop gets closed/destroyed + class HTTPXOneShotClient: + def __init__(self): + self.inner = None + + def __getattr__(self, name): + return getattr(self.inner, name) + + def build_request(self, *args, **kwargs): + self.inner = httpx.AsyncClient() + return self.inner.build_request(*args, **kwargs) + + from sentry.hybridcloud.apigateway.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + async def _eager_process_view_inner(mw_self, request, view_func, view_args, view_kwargs): + resp = await _original_middleware(mw_self, request, view_func, view_args, view_kwargs) + if isinstance(resp, StreamingHttpResponse) and resp.is_async: + body = b"" + async for chunk in resp: + body += chunk + sync_resp = HttpResponse( + content=body, + status=resp.status_code, + content_type=resp.get("Content-Type"), + ) + for header, value in resp.items(): + if header.lower() != "content-type": + sync_resp[header] = value + return sync_resp + return resp + + self._apigateway_patch = patch.object(proxy_mod, "proxy_client", HTTPXOneShotClient()) + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_inner", _eager_process_view_inner + ) + self._apigateway_patch.start() + self._middleware_patch.start() + def tearDown(self) -> None: + self._middleware_patch.stop() + self._apigateway_patch.stop() for conn in connections.all(): conn.close() super().tearDown() @@ -205,8 +255,6 @@ def test_health(self): follow=True, ) assert response.status_code == 200 - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_full_cycle(self): config = asdict(test_region) @@ -225,7 +273,7 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -234,8 +282,7 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 200) - retrieved_data = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved_data == b"test data" + assert response.content == b"test data" response = self.client.put( f"{base_url}{object_key}", @@ -245,7 +292,7 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 200) - new_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + new_key = json.loads(response.content)["key"] assert new_key == object_key response = self.client.get( @@ -254,8 +301,7 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == b"new data" + assert response.content == b"new data" response = self.client.delete( f"{base_url}{object_key}", @@ -263,8 +309,6 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 204) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] response = self.client.get( f"{base_url}{object_key}", @@ -272,8 +316,6 @@ def test_full_cycle(self): follow=True, ) assert_status_code(response, 404) - # consume body to close connection - b"".join(response.streaming_content) # type: ignore[attr-defined] def test_roundtrip_compressed(self): config = asdict(test_region) @@ -297,7 +339,7 @@ def test_roundtrip_compressed(self): follow=True, ) assert_status_code(response, 201) - object_key = json.loads(b"".join(response.streaming_content))["key"] # type: ignore[attr-defined] + object_key = json.loads(response.content)["key"] assert object_key is not None response = self.client.get( @@ -306,5 +348,4 @@ def test_roundtrip_compressed(self): follow=True, ) assert_status_code(response, 200) - retrieved = b"".join(response.streaming_content) # type: ignore[attr-defined] - assert retrieved == data + assert response.content == data From e74f1fdb0ddeda7deec54ba4e7929be384b9b87f Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Tue, 24 Mar 2026 16:22:02 +0100 Subject: [PATCH 03/16] fix: port `middleware/test_proxy.py` to async context --- src/sentry/testutils/asserts.py | 6 +-- src/sentry/testutils/cases.py | 67 +++++++++++++++------------ tests/sentry/middleware/test_proxy.py | 28 ++++++++++- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/src/sentry/testutils/asserts.py b/src/sentry/testutils/asserts.py index e8c07edd4832ee..5b08264859f6f6 100644 --- a/src/sentry/testutils/asserts.py +++ b/src/sentry/testutils/asserts.py @@ -2,7 +2,6 @@ from django.db import models from django.db.models.functions import Cast -from django.http import StreamingHttpResponse from sentry.constants import ObjectStatus from sentry.integrations.types import EventLifecycleOutcome @@ -44,10 +43,7 @@ def assert_commit_shape(commit): def assert_status_code(response, minimum: int, maximum: int | None = None): # Omit max to assert status_code == minimum. maximum = maximum or minimum + 1 - assert minimum <= response.status_code < maximum, ( - response.status_code, - response.getvalue() if isinstance(response, StreamingHttpResponse) else response.content, - ) + assert minimum <= response.status_code < maximum, response def assert_existing_projects_status( diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index a1202f16f7f456..87f435889ed01d 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -17,6 +17,7 @@ from uuid import UUID, uuid4 from zlib import compress +import httpx import pytest import requests import responses @@ -36,7 +37,6 @@ from django.utils import timezone from django.utils.functional import cached_property from google.protobuf.timestamp_pb2 import Timestamp -from requests.utils import CaseInsensitiveDict, get_encoding_from_headers from rest_framework import status from rest_framework.request import Request from rest_framework.response import Response @@ -678,36 +678,45 @@ def get_cursor_headers(self, response): def api_gateway_proxy_stubbed(self): """Mocks a fake api gateway proxy that redirects via Client objects""" - def proxy_raw_request( - method: str, - url: str, - headers: Mapping[str, str], - params: Mapping[str, str] | None, - data: Any, - **kwds: Any, - ) -> requests.Response: - from django.test.client import Client - - client = Client() - extra: Mapping[str, Any] = { - f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() - } - if params: - url += "?" + urlencode(params) - with assume_test_silo_mode(SiloMode.CELL): - resp = getattr(client, method.lower())( - url, b"".join(data), headers["Content-Type"], **extra - ) - response = requests.Response() - response.status_code = resp.status_code - response.headers = CaseInsensitiveDict(resp.headers) - response.encoding = get_encoding_from_headers(response.headers) - response.raw = BytesIO(resp.content) - return response + from asgiref.sync import sync_to_async + from django.test.client import Client + + class MockedProxy: + def __init__(self): + self.client = Client() + + @staticmethod + async def _consume_body(content): + ret = b"" + async for chunk in content: + ret += chunk + return ret + + def build_request(self, method, url, headers, params, content, timeout): + assert not params + target = getattr(self.client, method.lower()) + content_type = headers.pop("Content-Type", "application/octet-stream") + extra: Mapping[str, Any] = { + f"HTTP_{k.replace('-', '_').upper()}": v for k, v in headers.items() + } + return target, (url, content, content_type), extra + + async def send(self, req, stream, follow_redirects): + with assume_test_silo_mode(SiloMode.CELL): + url, content, content_type = req[1] + content = await self._consume_body(content) + resp = await sync_to_async(req[0])(url, content, content_type, **req[2]) + wresp = httpx.Response( + status_code=resp.status_code, + headers=dict(resp.headers), + content=resp.content, + ) + return wresp + mock_client = MockedProxy() with mock.patch( - "sentry.hybridcloud.apigateway.proxy.external_request", - new=proxy_raw_request, + "sentry.hybridcloud.apigateway.proxy.proxy_client", + new=mock_client, ): yield diff --git a/tests/sentry/middleware/test_proxy.py b/tests/sentry/middleware/test_proxy.py index 09043bbe2b5838..cd58ff879e868c 100644 --- a/tests/sentry/middleware/test_proxy.py +++ b/tests/sentry/middleware/test_proxy.py @@ -1,6 +1,8 @@ from __future__ import annotations +import asyncio from functools import cached_property +from unittest.mock import patch from django.http import HttpRequest @@ -8,6 +10,7 @@ from sentry.models.team import Team from sentry.silo.base import SiloMode from sentry.testutils.cases import APITestCase, TestCase +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import assume_test_silo_mode, control_silo_test from sentry.types.cell import Cell, RegionCategory from sentry.utils import json @@ -48,6 +51,29 @@ class FakedAPIProxyTest(APITestCase): endpoint = "sentry-api-0-organization-teams" method = "post" + def setUp(self) -> None: + super().setUp() + + from sentry.hybridcloud.apigateway.middleware import ApiGatewayMiddleware + + _original_middleware = ApiGatewayMiddleware._process_view_inner + + def _process_view_match(self, request, view_func, view_args, view_kwargs): + try: + asyncio.get_running_loop() + return self._process_view_inner(request, view_func, view_args, view_kwargs) + except RuntimeError: + return self._process_view_sync(request, view_func, view_args, view_kwargs) + + self._middleware_patch = patch.object( + ApiGatewayMiddleware, "_process_view_match", _process_view_match + ) + self._middleware_patch.start() + + def tearDown(self) -> None: + self._middleware_patch.stop() + super().tearDown() + def test_through_api_gateway(self) -> None: if SiloMode.get_current_mode() == SiloMode.MONOLITH: return @@ -62,7 +88,7 @@ def test_through_api_gateway(self) -> None: status_code=201, ) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) with assume_test_silo_mode(SiloMode.CELL): team = Team.objects.get(id=result["id"]) assert team.idp_provisioned From 556cdc89215df6329d9cd289f26c1fc5e9038f55 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Wed, 25 Mar 2026 13:09:50 +0100 Subject: [PATCH 04/16] chore: make CODEOWNERS check happy --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 13781d0ce9ff22..bf060641ee32f9 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -35,6 +35,7 @@ /src/sentry/projectoptions/ @getsentry/app-backend /src/sentry/receivers/ @getsentry/app-backend /src/sentry/ratelimits/ @getsentry/app-backend +/src/sentry/asgi.py @getsentry/app-backend /tests/js/sentry-test/ @getsentry/app-frontend /static/app/utils/ @getsentry/app-frontend From 2398fdc458d249fb52c81d3824074ff9e6ad1c03 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Thu, 26 Mar 2026 12:36:51 +0100 Subject: [PATCH 05/16] fix: proxy acceptance test --- src/sentry/hybridcloud/apigateway/proxy.py | 6 ++++++ src/sentry/testutils/helpers/response.py | 3 +++ tests/acceptance/test_proxy.py | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index 08618d224b7eca..5764c3ce3d50b3 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -146,6 +146,7 @@ async def proxy_cell_request( target_url = urljoin(cell.address, request.path) content_encoding = request.headers.get("Content-Encoding") + content_length = request.headers.get("Content-Length") header_dict = clean_proxy_headers(request.headers) header_dict[PROXY_APIGATEWAY_HEADER] = "true" @@ -165,6 +166,11 @@ async def proxy_cell_request( data = get_raw_body_async(request) else: data = BodyAsyncWrapper(request.body) + # With request streaming, and without `Content-Length` header, + # `httpx` will set chunked transfer encoding. Upstream doesn't support that, + # thus we re-add this header if it was present in the original request. + if content_length: + header_dict["Content-Length"] = content_length try: with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): diff --git a/src/sentry/testutils/helpers/response.py b/src/sentry/testutils/helpers/response.py index b1ae6a6521d58c..87d359c443bc2b 100644 --- a/src/sentry/testutils/helpers/response.py +++ b/src/sentry/testutils/helpers/response.py @@ -21,6 +21,9 @@ async def _async_streaming_response_content(response: StreamingHttpResponse) -> data = [] async for chunk in response: data.append(chunk) + iterator = response._iterator # type: ignore[attr-defined] + if hasattr(iterator, "aclose"): + await iterator.aclose() return b"".join(data) diff --git a/tests/acceptance/test_proxy.py b/tests/acceptance/test_proxy.py index b49285d62305f5..98c8a39871bd6d 100644 --- a/tests/acceptance/test_proxy.py +++ b/tests/acceptance/test_proxy.py @@ -17,6 +17,7 @@ from sentry.testutils.cases import TransactionTestCase from sentry.testutils.cell import override_cells from sentry.testutils.factories import Factories +from sentry.testutils.helpers.response import close_streaming_response from sentry.testutils.silo import cell_silo_test from sentry.types.cell import Cell from sentry.utils import json @@ -69,6 +70,6 @@ def test_through_api_gateway(self) -> None: ) assert_status_code(resp, 201) - result = json.loads(resp.getvalue()) + result = json.loads(close_streaming_response(resp)) team = Team.objects.get(id=result["id"]) assert team.idp_provisioned From 39706a42b03d9df787217af2acb740a2c3cf62d2 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Thu, 26 Mar 2026 13:03:00 +0100 Subject: [PATCH 06/16] chore(apigateway): skip body aiter if empty --- src/sentry/hybridcloud/apigateway/proxy.py | 7 ++++--- src/sentry/utils/http.py | 4 ++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index 5764c3ce3d50b3..d5ffcc865c6c2e 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -167,8 +167,9 @@ async def proxy_cell_request( else: data = BodyAsyncWrapper(request.body) # With request streaming, and without `Content-Length` header, - # `httpx` will set chunked transfer encoding. Upstream doesn't support that, - # thus we re-add this header if it was present in the original request. + # `httpx` will set chunked transfer encoding. + # Upstream doesn't necessarily support this, + # thus we re-add the header if it was present in the original request. if content_length: header_dict["Content-Length"] = content_length @@ -179,7 +180,7 @@ async def proxy_cell_request( target_url, headers=header_dict, params=dict(query_params) if query_params is not None else None, - content=_stream_request(data), # type: ignore[arg-type] + content=_stream_request(data) if data else None, # type: ignore[arg-type] timeout=timeout, ) resp = await proxy_client.send(req, stream=True, follow_redirects=False) diff --git a/src/sentry/utils/http.py b/src/sentry/utils/http.py index ae5724c8b502b9..53d15211b2d82f 100644 --- a/src/sentry/utils/http.py +++ b/src/sentry/utils/http.py @@ -229,8 +229,12 @@ def is_using_customer_domain(request: HttpRequest) -> TypeGuard[_HttpRequestWith class BodyAsyncWrapper: def __init__(self, body: Any): + self._bool = bool(body) self.body = [body] if isinstance(body, bytes) else body + def __bool__(self) -> bool: + return self._bool + def __aiter__(self): return BodyAsyncIter(self) From 6785c1d9fa0951f1e813f152ccce7ef982b12e12 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Thu, 26 Mar 2026 15:27:13 +0100 Subject: [PATCH 07/16] chore: bypass options calls from sdk logging integration when in async ctx --- src/sentry/utils/sdk.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/sentry/utils/sdk.py b/src/sentry/utils/sdk.py index a71e5ba4c11fbf..04d7916bdcb0c9 100644 --- a/src/sentry/utils/sdk.py +++ b/src/sentry/utils/sdk.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import copy import logging import sys @@ -268,6 +269,15 @@ def before_send_log(log: Log, _: Hint) -> Log | None: if attributes.get("sentry.message.template") == "New partitions assigned: %r": return None + try: + # FIXME: when in ASGI, the call to `options.store` from `in_random_rollout` + # would fail, because of SyncOnlyOperation. + # While we should ideally figure out how to actually fix this, + # for the moment let's just simplify and skip this entirely. + asyncio.get_running_loop() + return None + except Exception: + pass if in_random_rollout("ourlogs.sentry-emit-rollout"): return log return None From 602e39a913692e60b2beb367944b4514def62e48 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 27 Mar 2026 14:12:19 +0100 Subject: [PATCH 08/16] chore: move async apigateway in own module --- src/sentry/conf/server.py | 8 +- .../hybridcloud/apigateway/apigateway.py | 12 +- .../hybridcloud/apigateway/middleware.py | 54 +---- src/sentry/hybridcloud/apigateway/proxy.py | 148 ++++++++------ .../hybridcloud/apigateway_async/__init__.py | 3 + .../apigateway_async/apigateway.py | 108 ++++++++++ .../apigateway_async/middleware.py | 78 +++++++ .../hybridcloud/apigateway_async/proxy.py | 190 ++++++++++++++++++ .../objectstore/endpoints/organization.py | 2 +- src/sentry/runner/commands/devserver.py | 1 + src/sentry/testutils/cases.py | 2 +- src/sentry/testutils/helpers/apigateway.py | 8 +- tests/conftest.py | 3 + .../hybridcloud/apigateway/test_proxy.py | 2 +- tests/sentry/middleware/test_proxy.py | 2 +- .../endpoints/test_organization.py | 2 +- 16 files changed, 494 insertions(+), 129 deletions(-) create mode 100644 src/sentry/hybridcloud/apigateway_async/__init__.py create mode 100644 src/sentry/hybridcloud/apigateway_async/apigateway.py create mode 100644 src/sentry/hybridcloud/apigateway_async/middleware.py create mode 100644 src/sentry/hybridcloud/apigateway_async/proxy.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7f448f1cc9ed36..ac554fe899d60b 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -371,6 +371,12 @@ def env( # so that responses aren't modified after Content-Length is set, or have the # response modifying middleware reset the Content-Length header. # This is because CommonMiddleware Sets the Content-Length header for non-streaming responses. +APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "").lower() in ("1", "true", "y", "yes") +APIGW_MIDDLEWARE = ( + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + if APIGW_ASYNC + else "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" +) MIDDLEWARE: tuple[str, ...] = ( "csp.middleware.CSPMiddleware", "sentry.middleware.health.HealthCheck", @@ -387,7 +393,7 @@ def env( "sentry.middleware.auth.AuthenticationMiddleware", "sentry.middleware.ai_agent.AIAgentMiddleware", "sentry.middleware.integrations.IntegrationControlMiddleware", - "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware", + APIGW_MIDDLEWARE, "sentry.middleware.demo_mode_guard.DemoModeGuardMiddleware", "sentry.middleware.customer_domain.CustomerDomainMiddleware", "sentry.middleware.sudo.SudoMiddleware", diff --git a/src/sentry/hybridcloud/apigateway/apigateway.py b/src/sentry/hybridcloud/apigateway/apigateway.py index 9dd2172ddd59e6..3f77e204fd180e 100644 --- a/src/sentry/hybridcloud/apigateway/apigateway.py +++ b/src/sentry/hybridcloud/apigateway/apigateway.py @@ -30,10 +30,8 @@ def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset return endpoint_silo_limit.modes -async def proxy_request_if_needed( - request: Request, - view_func: Callable[..., HttpResponseBase], - view_kwargs: dict[str, Any], +def proxy_request_if_needed( + request: Request, view_func: Callable[..., HttpResponseBase], view_kwargs: dict[str, Any] ) -> HttpResponseBase | None: """ Main execution flow for the API Gateway. @@ -63,7 +61,7 @@ async def proxy_request_if_needed( "kind": "orgslug", }, ) - return await proxy_request(request, org_id_or_slug, url_name) + return proxy_request(request, org_id_or_slug, url_name) if url_name == "sentry-error-page-embed" and "dsn" in request.GET: # Error embed modal is special as customers can't easily use cell URLs. @@ -75,7 +73,7 @@ async def proxy_request_if_needed( "kind": "error-embed", }, ) - return await proxy_error_embed_request(request, dsn, url_name) + return proxy_error_embed_request(request, dsn, url_name) if ( request.resolver_match @@ -90,7 +88,7 @@ async def proxy_request_if_needed( }, ) - return await proxy_cell_request(request, cell, url_name) + return proxy_cell_request(request, cell, url_name) if url_name != "unknown": # If we know the URL but didn't proxy it record we could be missing diff --git a/src/sentry/hybridcloud/apigateway/middleware.py b/src/sentry/hybridcloud/apigateway/middleware.py index 7fbcb13f7c19f9..4c67467aac8302 100644 --- a/src/sentry/hybridcloud/apigateway/middleware.py +++ b/src/sentry/hybridcloud/apigateway/middleware.py @@ -1,10 +1,8 @@ from __future__ import annotations -import asyncio from collections.abc import Callable from typing import Any -from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction from django.http.response import HttpResponseBase from rest_framework.request import Request @@ -14,22 +12,12 @@ class ApiGatewayMiddleware: """Proxy requests intended for remote silos""" - async_capable = True - sync_capable = True - def __init__(self, get_response: Callable[[Request], HttpResponseBase]): self.get_response = get_response - if iscoroutinefunction(self.get_response): - markcoroutinefunction(self) - def __call__(self, request: Request) -> Any: - if iscoroutinefunction(self): - return self.__acall__(request) + def __call__(self, request: Request) -> HttpResponseBase: return self.get_response(request) - async def __acall__(self, request: Request) -> HttpResponseBase: - return await self.get_response(request) # type: ignore[misc] - def process_view( self, request: Request, @@ -37,42 +25,8 @@ def process_view( view_args: tuple[str], view_kwargs: dict[str, Any], ) -> HttpResponseBase | None: - return self._process_view_match(request, view_func, view_args, view_kwargs) - - def _process_view_match( - self, - request: Request, - view_func: Callable[..., HttpResponseBase], - view_args: tuple[str], - view_kwargs: dict[str, Any], - ) -> Any: - #: we check if we're in an async or sync runtime once, then - # overwrite the method with the actual impl. - try: - asyncio.get_running_loop() - method = self._process_view_inner - except RuntimeError: - method = self._process_view_sync # type: ignore[assignment] - setattr(self, "_process_view_match", method) - return method(request, view_func, view_args, view_kwargs) - - def _process_view_sync( - self, - request: Request, - view_func: Callable[..., HttpResponseBase], - view_args: tuple[str], - view_kwargs: dict[str, Any], - ) -> HttpResponseBase | None: - return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) - - async def _process_view_inner( - self, - request: Request, - view_func: Callable[..., HttpResponseBase], - view_args: tuple[str], - view_kwargs: dict[str, Any], - ) -> HttpResponseBase | None: - proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) + proxy_response = proxy_request_if_needed(request, view_func, view_kwargs) if proxy_response is not None: return proxy_response - return None + else: + return None diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index d5ffcc865c6c2e..aae730476e26b1 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -4,21 +4,21 @@ from __future__ import annotations -import asyncio import logging -from collections.abc import AsyncGenerator, AsyncIterator +from collections.abc import Generator from urllib.parse import urljoin, urlparse from wsgiref.util import is_hop_by_hop -import httpx -from asgiref.sync import sync_to_async from django.conf import settings -from django.http import HttpRequest, HttpResponse, StreamingHttpResponse +from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse from django.http.response import HttpResponseBase +from requests import Response as ExternalResponse +from requests import request as external_request +from requests.exceptions import Timeout from sentry import options from sentry.api.exceptions import RequestTimeout -from sentry.objectstore.endpoints.organization import get_raw_body_async +from sentry.objectstore.endpoints.organization import ChunkedEncodingDecoder, get_raw_body from sentry.silo.util import ( PROXY_APIGATEWAY_HEADER, PROXY_DIRECT_LOCATION_HEADER, @@ -32,12 +32,11 @@ get_cell_for_organization, ) from sentry.utils import metrics -from sentry.utils.http import BodyAsyncWrapper +from sentry.utils.circuit_breaker2 import CircuitBreaker, CountBasedTripStrategy +from sentry.utils.http import BodyWithLength logger = logging.getLogger(__name__) -proxy_client = httpx.AsyncClient() - # Endpoints that handle uploaded files have higher timeouts configured # and we need to honor those timeouts when proxying. # See frontend/templates/sites-enabled/sentry.io in getsentry/ops @@ -56,28 +55,21 @@ PROXY_CHUNK_SIZE = 512 * 1024 -async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]: - """Yield chunks from an httpx response and close the connection when done.""" - try: - async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE): - yield chunk - finally: - await response.aclose() - - -def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: - """Convert an httpx Response into a Django response.""" +def _parse_response(response: ExternalResponse, remote_url: str) -> StreamingHttpResponse: + """ + Convert the Responses class from requests into the drf Response + """ - new_headers = clean_outbound_headers(response.headers) - content_type = new_headers.pop("Content-Type", None) + def stream_response() -> Generator[bytes]: + yield from response.iter_content(PROXY_CHUNK_SIZE) streamed_response = StreamingHttpResponse( - streaming_content=_stream_response_and_close(response), + streaming_content=stream_response(), status=response.status_code, - content_type=content_type, + content_type=response.headers.pop("Content-Type", None), ) - - for header, value in new_headers.items(): + # Add Headers to response + for header, value in response.headers.items(): if not is_hop_by_hop(header): streamed_response[header] = value @@ -85,28 +77,19 @@ def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpR return streamed_response -async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]: - async for chunk in body: - yield chunk - - -async def proxy_request( - request: HttpRequest, - org_id_or_slug: str, - url_name: str, -) -> HttpResponseBase: +def proxy_request(request: HttpRequest, org_id_or_slug: str, url_name: str) -> HttpResponseBase: """Take a django request object and proxy it to a remote location given an org_id_or_slug""" try: - cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug) + cell = get_cell_for_organization(org_id_or_slug) except CellResolutionError as e: logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)}) return HttpResponse(status=404) - return await proxy_cell_request(request, cell, url_name) + return proxy_cell_request(request, cell, url_name) -async def proxy_error_embed_request( +def proxy_error_embed_request( request: HttpRequest, dsn: str, url_name: str ) -> HttpResponseBase | None: try: @@ -126,7 +109,7 @@ async def proxy_error_embed_request( # If we don't have a o123.ingest.{cell}.{app_host} style domain # we forward to the monolith cell cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) - return await proxy_cell_request(request, cell, url_name) + return proxy_cell_request(request, cell, url_name) try: cell_offset = len(app_segments) + 1 cell_segment = host_segments[cell_offset * -1] @@ -134,57 +117,96 @@ async def proxy_error_embed_request( except Exception: return None - return await proxy_cell_request(request, cell, url_name) + return proxy_cell_request(request, cell, url_name) -async def proxy_cell_request( - request: HttpRequest, - cell: Cell, - url_name: str, -) -> StreamingHttpResponse: +def proxy_cell_request(request: HttpRequest, cell: Cell, url_name: str) -> HttpResponseBase: """Take a django request object and proxy it to a cell silo""" + + metric_tags = {"region": cell.name, "url_name": url_name} + circuit_breaker: CircuitBreaker | None = None + # TODO(mark) remove rollout options + if options.get("apigateway.proxy.circuit-breaker.enabled"): + try: + circuit_breaker = CircuitBreaker( + key=f"apigateway.proxy.{cell.name}", + config=options.get("apigateway.proxy.circuit-breaker.config"), + trip_strategy=CountBasedTripStrategy.from_config( + options.get("apigateway.proxy.circuit-breaker.config") + ), + ) + except Exception as e: + logger.warning("apigateway.invalid-breaker-config", extra={"message": str(e)}) + + if circuit_breaker is not None: + if not circuit_breaker.should_allow_request(): + metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags) + if options.get("apigateway.proxy.circuit-breaker.enforce"): + body = { + "error": "apigateway", + "detail": "Downstream service temporarily unavailable", + } + return JsonResponse(body, status=503) + target_url = urljoin(cell.address, request.path) content_encoding = request.headers.get("Content-Encoding") - content_length = request.headers.get("Content-Length") header_dict = clean_proxy_headers(request.headers) header_dict[PROXY_APIGATEWAY_HEADER] = "true" + # TODO: use requests session for connection pooling capabilities assert request.method is not None query_params = request.GET - timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) - metric_tags = {"region": cell.name, "url_name": url_name} + # This option has a default of None, which is cast to 0 + timeout = options.get("apigateway.proxy.timeout") + if not timeout: + timeout = settings.GATEWAY_PROXY_TIMEOUT + timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, timeout) # XXX: See sentry.testutils.pytest.sentry for more information if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) + data: bytes | Generator[bytes] | ChunkedEncodingDecoder | BodyWithLength | None = None if url_name == "sentry-api-0-organization-objectstore": if content_encoding: header_dict["Content-Encoding"] = content_encoding - data = get_raw_body_async(request) + data = get_raw_body(request) else: - data = BodyAsyncWrapper(request.body) - # With request streaming, and without `Content-Length` header, - # `httpx` will set chunked transfer encoding. - # Upstream doesn't necessarily support this, - # thus we re-add the header if it was present in the original request. - if content_length: - header_dict["Content-Length"] = content_length + data = BodyWithLength(request) try: with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): - req = proxy_client.build_request( + resp = external_request( request.method, - target_url, + url=target_url, headers=header_dict, params=dict(query_params) if query_params is not None else None, - content=_stream_request(data) if data else None, # type: ignore[arg-type] + data=data, + stream=True, timeout=timeout, + # By default, external_request will resolve any redirects for any verb except for HEAD. + # We explicitly disable this behavior to avoid misrepresenting the original sentry.io request with the + # body response of the redirect. + allow_redirects=False, ) - resp = await proxy_client.send(req, stream=True, follow_redirects=False) - return _adapt_response(resp, target_url) - except (httpx.TimeoutException, asyncio.CancelledError): + except Timeout: + metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) + try: + if circuit_breaker is not None: + circuit_breaker.record_error() + except Exception: + logger.exception("Failed to record circuitbreaker failure") + # remote silo timeout. Use DRF timeout instead raise RequestTimeout() + + if resp.status_code >= 500 and circuit_breaker is not None: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuit_breaker.record_error() + + new_headers = clean_outbound_headers(resp.headers) + resp.headers.clear() + resp.headers.update(new_headers) + return _parse_response(resp, target_url) diff --git a/src/sentry/hybridcloud/apigateway_async/__init__.py b/src/sentry/hybridcloud/apigateway_async/__init__.py new file mode 100644 index 00000000000000..e4b8561f6bee7d --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/__init__.py @@ -0,0 +1,3 @@ +from .apigateway import proxy_request_if_needed + +__all__ = ("proxy_request_if_needed",) diff --git a/src/sentry/hybridcloud/apigateway_async/apigateway.py b/src/sentry/hybridcloud/apigateway_async/apigateway.py new file mode 100644 index 00000000000000..3e02e6613aee17 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/apigateway.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import Any + +from django.conf import settings +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from sentry.silo.base import SiloLimit, SiloMode +from sentry.types.cell import get_cell_by_name +from sentry.utils import metrics + +from .proxy import ( + proxy_cell_request, + proxy_error_embed_request, + proxy_request, +) + +logger = logging.getLogger(__name__) + + +def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset[SiloMode] | None: + view_class = getattr(view_func, "view_class", None) + if not view_class: + return None + if not hasattr(view_class, "silo_limit"): + return None + endpoint_silo_limit: SiloLimit = view_class.silo_limit + return endpoint_silo_limit.modes + + +async def proxy_request_if_needed( + request: Request, + view_func: Callable[..., HttpResponseBase], + view_kwargs: dict[str, Any], +) -> HttpResponseBase | None: + """ + Main execution flow for the API Gateway. + returns None if proxying is not required, or a response if the proxy was successful. + """ + current_silo_mode = SiloMode.get_current_mode() + if current_silo_mode != SiloMode.CONTROL: + return None + + silo_modes = _get_view_silo_mode(view_func) + if not silo_modes or current_silo_mode in silo_modes: + return None + + url_name = "unknown" + if request.resolver_match: + url_name = request.resolver_match.url_name or url_name + + if "organization_slug" in view_kwargs or "organization_id_or_slug" in view_kwargs: + org_id_or_slug = str( + view_kwargs.get("organization_slug") or view_kwargs.get("organization_id_or_slug", "") + ) + + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "orgslug", + }, + ) + return await proxy_request(request, org_id_or_slug, url_name) + + if url_name == "sentry-error-page-embed" and "dsn" in request.GET: + # Error embed modal is special as customers can't easily use cell URLs. + dsn = request.GET["dsn"] + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "error-embed", + }, + ) + return await proxy_error_embed_request(request, dsn, url_name) + + if ( + request.resolver_match + and request.resolver_match.url_name in settings.REGION_PINNED_URL_NAMES + ): + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + metrics.incr( + "apigateway.proxy_request", + tags={ + "url_name": url_name, + "kind": "regionpin", + }, + ) + + return await proxy_cell_request(request, cell, url_name) + + if url_name != "unknown": + # If we know the URL but didn't proxy it record we could be missing + # URL handling and that needs to be fixed. + metrics.incr( + "apigateway.proxy_request", + tags={ + "kind": "noop", + "url_name": url_name, + }, + ) + logger.info("apigateway.unknown_url", extra={"url": request.path}) + + return None diff --git a/src/sentry/hybridcloud/apigateway_async/middleware.py b/src/sentry/hybridcloud/apigateway_async/middleware.py new file mode 100644 index 00000000000000..446210ee1836d2 --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/middleware.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from typing import Any + +from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction +from django.http.response import HttpResponseBase +from rest_framework.request import Request + +from . import proxy_request_if_needed + + +class ApiGatewayMiddleware: + """Proxy requests intended for remote silos""" + + async_capable = True + sync_capable = True + + def __init__(self, get_response: Callable[[Request], HttpResponseBase]): + self.get_response = get_response + if iscoroutinefunction(self.get_response): + markcoroutinefunction(self) + + def __call__(self, request: Request) -> Any: + if iscoroutinefunction(self): + return self.__acall__(request) + return self.get_response(request) + + async def __acall__(self, request: Request) -> HttpResponseBase: + return await self.get_response(request) # type: ignore[misc] + + def process_view( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return self._process_view_match(request, view_func, view_args, view_kwargs) + + def _process_view_match( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> Any: + #: we check if we're in an async or sync runtime once, then + # overwrite the method with the actual impl. + try: + asyncio.get_running_loop() + method = self._process_view_inner + except RuntimeError: + method = self._process_view_sync # type: ignore[assignment] + setattr(self, "_process_view_match", method) + return method(request, view_func, view_args, view_kwargs) + + def _process_view_sync( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs) + + async def _process_view_inner( + self, + request: Request, + view_func: Callable[..., HttpResponseBase], + view_args: tuple[str], + view_kwargs: dict[str, Any], + ) -> HttpResponseBase | None: + proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs) + if proxy_response is not None: + return proxy_response + return None diff --git a/src/sentry/hybridcloud/apigateway_async/proxy.py b/src/sentry/hybridcloud/apigateway_async/proxy.py new file mode 100644 index 00000000000000..d5ffcc865c6c2e --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/proxy.py @@ -0,0 +1,190 @@ +""" +Utilities related to proxying a request to a cell +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncGenerator, AsyncIterator +from urllib.parse import urljoin, urlparse +from wsgiref.util import is_hop_by_hop + +import httpx +from asgiref.sync import sync_to_async +from django.conf import settings +from django.http import HttpRequest, HttpResponse, StreamingHttpResponse +from django.http.response import HttpResponseBase + +from sentry import options +from sentry.api.exceptions import RequestTimeout +from sentry.objectstore.endpoints.organization import get_raw_body_async +from sentry.silo.util import ( + PROXY_APIGATEWAY_HEADER, + PROXY_DIRECT_LOCATION_HEADER, + clean_outbound_headers, + clean_proxy_headers, +) +from sentry.types.cell import ( + Cell, + CellResolutionError, + get_cell_by_name, + get_cell_for_organization, +) +from sentry.utils import metrics +from sentry.utils.http import BodyAsyncWrapper + +logger = logging.getLogger(__name__) + +proxy_client = httpx.AsyncClient() + +# Endpoints that handle uploaded files have higher timeouts configured +# and we need to honor those timeouts when proxying. +# See frontend/templates/sites-enabled/sentry.io in getsentry/ops +ENDPOINT_TIMEOUT_OVERRIDE = { + "sentry-api-0-chunk-upload": 90.0, + "sentry-api-0-organization-release-files": 90.0, + "sentry-api-0-project-release-files": 90.0, + "sentry-api-0-dsym-files": 90.0, + "sentry-api-0-installable-preprod-artifact-download": 90.0, + "sentry-api-0-project-preprod-artifact-download": 90.0, + "sentry-api-0-organization-preprod-artifact-size-analysis-download": 90.0, + "sentry-api-0-organization-objectstore": 90.0, +} + +# stream 0.5 MB at a time +PROXY_CHUNK_SIZE = 512 * 1024 + + +async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]: + """Yield chunks from an httpx response and close the connection when done.""" + try: + async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE): + yield chunk + finally: + await response.aclose() + + +def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: + """Convert an httpx Response into a Django response.""" + + new_headers = clean_outbound_headers(response.headers) + content_type = new_headers.pop("Content-Type", None) + + streamed_response = StreamingHttpResponse( + streaming_content=_stream_response_and_close(response), + status=response.status_code, + content_type=content_type, + ) + + for header, value in new_headers.items(): + if not is_hop_by_hop(header): + streamed_response[header] = value + + streamed_response[PROXY_DIRECT_LOCATION_HEADER] = remote_url + return streamed_response + + +async def _stream_request(body: AsyncIterator[bytes]) -> AsyncGenerator[bytes]: + async for chunk in body: + yield chunk + + +async def proxy_request( + request: HttpRequest, + org_id_or_slug: str, + url_name: str, +) -> HttpResponseBase: + """Take a django request object and proxy it to a remote location given an org_id_or_slug""" + + try: + cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug) + except CellResolutionError as e: + logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)}) + return HttpResponse(status=404) + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_error_embed_request( + request: HttpRequest, dsn: str, url_name: str +) -> HttpResponseBase | None: + try: + parsed = urlparse(dsn) + except Exception as err: + logger.info("apigateway.error_embed.invalid_dsn", extra={"dsn": dsn, "error": err}) + return None + host = parsed.netloc + app_host = urlparse(options.get("system.url-prefix")).netloc + if not host.endswith(app_host): + # Don't further parse URLs that aren't for us. + return None + + app_segments = app_host.split(".") + host_segments = host.split(".") + if len(host_segments) - len(app_segments) < 3: + # If we don't have a o123.ingest.{cell}.{app_host} style domain + # we forward to the monolith cell + cell = get_cell_by_name(settings.SENTRY_MONOLITH_REGION) + return await proxy_cell_request(request, cell, url_name) + try: + cell_offset = len(app_segments) + 1 + cell_segment = host_segments[cell_offset * -1] + cell = get_cell_by_name(cell_segment) + except Exception: + return None + + return await proxy_cell_request(request, cell, url_name) + + +async def proxy_cell_request( + request: HttpRequest, + cell: Cell, + url_name: str, +) -> StreamingHttpResponse: + """Take a django request object and proxy it to a cell silo""" + target_url = urljoin(cell.address, request.path) + + content_encoding = request.headers.get("Content-Encoding") + content_length = request.headers.get("Content-Length") + header_dict = clean_proxy_headers(request.headers) + header_dict[PROXY_APIGATEWAY_HEADER] = "true" + + assert request.method is not None + query_params = request.GET + + timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) + metric_tags = {"region": cell.name, "url_name": url_name} + + # XXX: See sentry.testutils.pytest.sentry for more information + if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): + return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) + + if url_name == "sentry-api-0-organization-objectstore": + if content_encoding: + header_dict["Content-Encoding"] = content_encoding + data = get_raw_body_async(request) + else: + data = BodyAsyncWrapper(request.body) + # With request streaming, and without `Content-Length` header, + # `httpx` will set chunked transfer encoding. + # Upstream doesn't necessarily support this, + # thus we re-add the header if it was present in the original request. + if content_length: + header_dict["Content-Length"] = content_length + + try: + with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): + req = proxy_client.build_request( + request.method, + target_url, + headers=header_dict, + params=dict(query_params) if query_params is not None else None, + content=_stream_request(data) if data else None, # type: ignore[arg-type] + timeout=timeout, + ) + resp = await proxy_client.send(req, stream=True, follow_redirects=False) + return _adapt_response(resp, target_url) + except (httpx.TimeoutException, asyncio.CancelledError): + # remote silo timeout. Use DRF timeout instead + raise RequestTimeout() diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index e770110e1a9222..54794d3afb4c85 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -174,7 +174,7 @@ def get_raw_body_async( return ChunkedEncodingAsyncDecoder(wsgi_input._read) # type: ignore[union-attr] # wsgiref and the request has been already proxied through control silo - return BodyWithLength(request).__aiter__() + return BodyWithLength(request) def get_target_url(path: str) -> str: diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 3b5397da640f0f..7ab82ee5d6f6bf 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -471,6 +471,7 @@ def devserver( "SENTRY_SILO_DEVSERVER": "1", "SENTRY_SILO_MODE": "CONTROL", "SENTRY_REGION": "", + "SENTRY_ASYNC_APIGW": "true", "SENTRY_CONTROL_SILO_PORT": server_port, "SENTRY_REGION_SILO_PORT": str(ports["region.server"]), "SENTRY_DEVSERVER_BIND": f"127.0.0.1:{server_port}", diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index 87f435889ed01d..486513a7ca3e2d 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -715,7 +715,7 @@ async def send(self, req, stream, follow_redirects): mock_client = MockedProxy() with mock.patch( - "sentry.hybridcloud.apigateway.proxy.proxy_client", + "sentry.hybridcloud.apigateway_async.proxy.proxy_client", new=mock_client, ): yield diff --git a/src/sentry/testutils/helpers/apigateway.py b/src/sentry/testutils/helpers/apigateway.py index 0c6a857c10a1d1..bc3cde3f74f5cf 100644 --- a/src/sentry/testutils/helpers/apigateway.py +++ b/src/sentry/testutils/helpers/apigateway.py @@ -159,7 +159,7 @@ def handler(self, request: httpx.Request) -> httpx.Response: def mock_proxy_client(router: HttpxMockRouter): """Patch the proxy_client with a mock httpx.AsyncClient using the given router.""" mock_client = httpx.AsyncClient(transport=httpx.MockTransport(router.handler)) - with patch("sentry.hybridcloud.apigateway.proxy.proxy_client", mock_client): + with patch("sentry.hybridcloud.apigateway_async.proxy.proxy_client", mock_client): yield mock_client @@ -217,8 +217,10 @@ def request_callback(request: httpx.Request): def provision_middleware(): middleware = list(settings.MIDDLEWARE) - if "sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware" not in middleware: - middleware = ["sentry.hybridcloud.apigateway.middleware.ApiGatewayMiddleware"] + middleware + if "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" not in middleware: + middleware = [ + "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" + ] + middleware return middleware diff --git a/tests/conftest.py b/tests/conftest.py index 4f47d7f85ccffe..c49928870c06c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,9 @@ import responses import sentry_sdk +# Set async apigateway as soon as possible +os.environ["SENTRY_ASYNC_APIGW"] = "true" + # Disable crash recovery server in pytest-rerunfailures. Under xdist, Sentry's # global socket.setdefaulttimeout(5) causes the server's per-worker recv threads # to die during Django init (~10s), silently breaking crash recovery anyway. diff --git a/tests/sentry/hybridcloud/apigateway/test_proxy.py b/tests/sentry/hybridcloud/apigateway/test_proxy.py index 6a44192f5111f8..68bbabf4645910 100644 --- a/tests/sentry/hybridcloud/apigateway/test_proxy.py +++ b/tests/sentry/hybridcloud/apigateway/test_proxy.py @@ -5,7 +5,7 @@ from django.core.files.uploadedfile import SimpleUploadedFile from django.test.client import RequestFactory -from sentry.hybridcloud.apigateway.proxy import proxy_request as _proxy_request +from sentry.hybridcloud.apigateway_async.proxy import proxy_request as _proxy_request from sentry.silo.util import ( INVALID_OUTBOUND_HEADERS, PROXY_APIGATEWAY_HEADER, diff --git a/tests/sentry/middleware/test_proxy.py b/tests/sentry/middleware/test_proxy.py index cd58ff879e868c..57406f423b0def 100644 --- a/tests/sentry/middleware/test_proxy.py +++ b/tests/sentry/middleware/test_proxy.py @@ -54,7 +54,7 @@ class FakedAPIProxyTest(APITestCase): def setUp(self) -> None: super().setUp() - from sentry.hybridcloud.apigateway.middleware import ApiGatewayMiddleware + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware _original_middleware = ApiGatewayMiddleware._process_view_inner diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index abe4032c5bbbd9..07101516d7a8f1 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -200,7 +200,7 @@ def build_request(self, *args, **kwargs): self.inner = httpx.AsyncClient() return self.inner.build_request(*args, **kwargs) - from sentry.hybridcloud.apigateway.middleware import ApiGatewayMiddleware + from sentry.hybridcloud.apigateway_async.middleware import ApiGatewayMiddleware _original_middleware = ApiGatewayMiddleware._process_view_inner From 52d23a4ecc89607e82581100eaeb2360796be7d0 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 27 Mar 2026 15:01:59 +0100 Subject: [PATCH 09/16] feat(apigateway): implement circuitbreakers in async variant --- src/sentry/conf/server.py | 3 + .../apigateway_async/circuitbreaker.py | 100 ++++++++++++++++++ .../hybridcloud/apigateway_async/proxy.py | 90 ++++++++++------ 3 files changed, 163 insertions(+), 30 deletions(-) create mode 100644 src/sentry/hybridcloud/apigateway_async/circuitbreaker.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index ac554fe899d60b..f4bc43aed2786b 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -3187,6 +3187,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: } # Used in tests to skip forwarding relay paths to a region silo that does not exist. APIGATEWAY_PROXY_SKIP_RELAY = False +APIGATEWAY_PROXY_MAX_CONCURRENCY = 100 +APIGATEWAY_PROXY_MAX_FAILURES = 100 +APIGATEWAY_PROXY_FAILURE_WINDOW = 60 # Shared resource ids for accounting EVENT_PROCESSING_STORE = "rc_processing_redis" diff --git a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py new file mode 100644 index 00000000000000..87b59b5b709e4c --- /dev/null +++ b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import asyncio +import time +from collections import defaultdict + +from django.conf import settings + + +class CircuitBreaker: + __slots__ = [ + "concurrency", + "counter_window", + "failures", + "semaphore", + "_clock", + "_counters", + "_counter_idx", + ] + + def __init__(self, concurrency, failures): + self.concurrency = concurrency + self.counter_window = failures[0] + self.failures = failures[1] + self.semaphore = asyncio.Semaphore(self.concurrency) + self._clock = 0 + self._counters = [0, 0] + self._counter_idx = 0 + + def _counter_flip(self, clock): + self._clock = clock + prev = self._counter_idx + self._counter_idx = 1 - prev + self._counters[prev] = 0 + + def _maybe_counter_flip(self): + now = int(time.monotonic()) + delta = now - self._clock + if delta > 0: + if delta // self.counter_window: + self._counter_flip(now) + + def counter_incr(self): + self._maybe_counter_flip() + self._counters[self._counter_idx] += 1 + + def window_overflow(self) -> bool: + return self._counters[self._counter_idx] > self.failures + + def overflow(self) -> bool: + return self.semaphore.locked() + + def ctx(self) -> CircuitBreakerCtx: + return CircuitBreakerCtx(self) + + +class CircuitBreakerOverflow(Exception): ... + + +class CircuitBreakerWindowOverflow(Exception): ... + + +class CircuitBreakerCtx: + __slots__ = ["cb"] + + def __init__(self, cb: CircuitBreaker): + self.cb = cb + + def incr_failures(self): + self.cb.counter_incr() + + async def __aenter__(self): + if self.cb.overflow(): + raise CircuitBreakerOverflow + await self.cb.semaphore.acquire() + if self.cb.window_overflow(): + self.cb.semaphore.release() + raise CircuitBreakerWindowOverflow + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + self.cb.semaphore.release() + + +class CircuitBreakerManager: + __slots__ = ["objs"] + + def __init__( + self, + max_concurrency: int | None = None, + failures: int | None = None, + failure_window: int | None = None, + ): + concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY + failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES + failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW + self.objs = defaultdict(lambda: CircuitBreaker(concurrency, (failure_window, failures))) + + def get(self, key: str) -> CircuitBreakerCtx: + return self.objs[key].ctx() diff --git a/src/sentry/hybridcloud/apigateway_async/proxy.py b/src/sentry/hybridcloud/apigateway_async/proxy.py index d5ffcc865c6c2e..3d9900b20a27c4 100644 --- a/src/sentry/hybridcloud/apigateway_async/proxy.py +++ b/src/sentry/hybridcloud/apigateway_async/proxy.py @@ -13,7 +13,7 @@ import httpx from asgiref.sync import sync_to_async from django.conf import settings -from django.http import HttpRequest, HttpResponse, StreamingHttpResponse +from django.http import HttpRequest, HttpResponse, JsonResponse, StreamingHttpResponse from django.http.response import HttpResponseBase from sentry import options @@ -34,9 +34,16 @@ from sentry.utils import metrics from sentry.utils.http import BodyAsyncWrapper +from .circuitbreaker import ( + CircuitBreakerManager, + CircuitBreakerOverflow, + CircuitBreakerWindowOverflow, +) + logger = logging.getLogger(__name__) proxy_client = httpx.AsyncClient() +circuitbreakers = CircuitBreakerManager() # Endpoints that handle uploaded files have higher timeouts configured # and we need to honor those timeouts when proxying. @@ -141,8 +148,9 @@ async def proxy_cell_request( request: HttpRequest, cell: Cell, url_name: str, -) -> StreamingHttpResponse: +) -> HttpResponseBase: """Take a django request object and proxy it to a cell silo""" + metric_tags = {"region": cell.name, "url_name": url_name} target_url = urljoin(cell.address, request.path) content_encoding = request.headers.get("Content-Encoding") @@ -154,37 +162,59 @@ async def proxy_cell_request( query_params = request.GET timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) - metric_tags = {"region": cell.name, "url_name": url_name} # XXX: See sentry.testutils.pytest.sentry for more information if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"): return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404) - if url_name == "sentry-api-0-organization-objectstore": - if content_encoding: - header_dict["Content-Encoding"] = content_encoding - data = get_raw_body_async(request) - else: - data = BodyAsyncWrapper(request.body) - # With request streaming, and without `Content-Length` header, - # `httpx` will set chunked transfer encoding. - # Upstream doesn't necessarily support this, - # thus we re-add the header if it was present in the original request. - if content_length: - header_dict["Content-Length"] = content_length - try: - with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): - req = proxy_client.build_request( - request.method, - target_url, - headers=header_dict, - params=dict(query_params) if query_params is not None else None, - content=_stream_request(data) if data else None, # type: ignore[arg-type] - timeout=timeout, - ) - resp = await proxy_client.send(req, stream=True, follow_redirects=False) - return _adapt_response(resp, target_url) - except (httpx.TimeoutException, asyncio.CancelledError): - # remote silo timeout. Use DRF timeout instead - raise RequestTimeout() + async with circuitbreakers.get(cell.name) as circuitbreaker: + if url_name == "sentry-api-0-organization-objectstore": + if content_encoding: + header_dict["Content-Encoding"] = content_encoding + data = get_raw_body_async(request) + else: + data = BodyAsyncWrapper(request.body) + # With request streaming, and without `Content-Length` header, + # `httpx` will set chunked transfer encoding. + # Upstream doesn't necessarily support this, + # thus we re-add the header if it was present in the original request. + if content_length: + header_dict["Content-Length"] = content_length + + try: + with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags): + req = proxy_client.build_request( + request.method, + target_url, + headers=header_dict, + params=dict(query_params) if query_params is not None else None, + content=_stream_request(data) if data else None, # type: ignore[arg-type] + timeout=timeout, + ) + resp = await proxy_client.send(req, stream=True, follow_redirects=False) + if resp.status_code >= 502: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + return _adapt_response(resp, target_url) + except (httpx.TimeoutException, asyncio.CancelledError): + metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) + circuitbreaker.incr_failures() + # remote silo timeout. Use DRF timeout instead + raise RequestTimeout() + except httpx.RequestError: + metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) + circuitbreaker.incr_failures() + raise + except CircuitBreakerOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.overflow", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Too many requests"}, + status=429, + ) + except CircuitBreakerWindowOverflow: + metrics.incr("apigateway.proxy.circuit_breaker.rejected", tags=metric_tags) + return JsonResponse( + {"error": "apigateway", "detail": "Downstream service temporarily unavailable"}, + status=503, + ) From bb62cbf9fc0ea03db498c5873115fe759b46c88d Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 27 Mar 2026 15:21:23 +0100 Subject: [PATCH 10/16] chore: make mypy happy again --- .../apigateway_async/circuitbreaker.py | 19 +++++++++++-------- .../objectstore/endpoints/organization.py | 4 ++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py index 87b59b5b709e4c..5272d6de84d3c2 100644 --- a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py +++ b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py @@ -3,6 +3,7 @@ import asyncio import time from collections import defaultdict +from typing import Any from django.conf import settings @@ -18,7 +19,7 @@ class CircuitBreaker: "_counter_idx", ] - def __init__(self, concurrency, failures): + def __init__(self, concurrency: int, failures: tuple[int, int]) -> None: self.concurrency = concurrency self.counter_window = failures[0] self.failures = failures[1] @@ -27,20 +28,20 @@ def __init__(self, concurrency, failures): self._counters = [0, 0] self._counter_idx = 0 - def _counter_flip(self, clock): + def _counter_flip(self, clock: int) -> None: self._clock = clock prev = self._counter_idx self._counter_idx = 1 - prev self._counters[prev] = 0 - def _maybe_counter_flip(self): + def _maybe_counter_flip(self) -> None: now = int(time.monotonic()) delta = now - self._clock if delta > 0: if delta // self.counter_window: self._counter_flip(now) - def counter_incr(self): + def counter_incr(self) -> None: self._maybe_counter_flip() self._counters[self._counter_idx] += 1 @@ -66,10 +67,10 @@ class CircuitBreakerCtx: def __init__(self, cb: CircuitBreaker): self.cb = cb - def incr_failures(self): + def incr_failures(self) -> None: self.cb.counter_incr() - async def __aenter__(self): + async def __aenter__(self) -> CircuitBreakerCtx: if self.cb.overflow(): raise CircuitBreakerOverflow await self.cb.semaphore.acquire() @@ -78,7 +79,7 @@ async def __aenter__(self): raise CircuitBreakerWindowOverflow return self - async def __aexit__(self, exc_type, exc_value, exc_tb): + async def __aexit__(self, exc_type: Any, exc_value: Any, exc_tb: Any) -> None: self.cb.semaphore.release() @@ -94,7 +95,9 @@ def __init__( concurrency = max_concurrency or settings.APIGATEWAY_PROXY_MAX_CONCURRENCY failures = failures or settings.APIGATEWAY_PROXY_MAX_FAILURES failure_window = failure_window or settings.APIGATEWAY_PROXY_FAILURE_WINDOW - self.objs = defaultdict(lambda: CircuitBreaker(concurrency, (failure_window, failures))) + self.objs: dict[str, CircuitBreaker] = defaultdict( + lambda: CircuitBreaker(concurrency, (failure_window, failures)) + ) def get(self, key: str) -> CircuitBreakerCtx: return self.objs[key].ctx() diff --git a/src/sentry/objectstore/endpoints/organization.py b/src/sentry/objectstore/endpoints/organization.py index 54794d3afb4c85..5bb22190d1a37f 100644 --- a/src/sentry/objectstore/endpoints/organization.py +++ b/src/sentry/objectstore/endpoints/organization.py @@ -12,7 +12,7 @@ from rest_framework.request import Request from rest_framework.response import Response -from sentry.utils.http import BodyAsyncWrapper, BodyWithLength, BodyWithLengthAiter +from sentry.utils.http import BodyAsyncWrapper, BodyWithLength # TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian uwsgi: Any = None @@ -153,7 +153,7 @@ def stream_generator(): def get_raw_body_async( request: HttpRequest, -) -> BodyAsyncWrapper | ChunkedEncodingAsyncDecoder | BodyWithLengthAiter | None: +) -> BodyAsyncWrapper | ChunkedEncodingAsyncDecoder | BodyWithLength: if request.body: return BodyAsyncWrapper(request.body) From 8760f635e048ebdd22c47ea243487834eaf8af70 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 27 Mar 2026 15:50:02 +0100 Subject: [PATCH 11/16] fix: objectstore apigateway tests --- tests/sentry/objectstore/endpoints/test_organization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sentry/objectstore/endpoints/test_organization.py b/tests/sentry/objectstore/endpoints/test_organization.py index 07101516d7a8f1..3dbc8df6020051 100644 --- a/tests/sentry/objectstore/endpoints/test_organization.py +++ b/tests/sentry/objectstore/endpoints/test_organization.py @@ -12,7 +12,7 @@ from objectstore_client import Client, RequestError, Session, Usecase from pytest_django.live_server_helper import LiveServer -from sentry.hybridcloud.apigateway import proxy as proxy_mod +from sentry.hybridcloud.apigateway_async import proxy as proxy_mod from sentry.silo.base import SiloMode, SingleProcessSiloModeState from sentry.testutils.asserts import assert_status_code from sentry.testutils.cases import TransactionTestCase From 89fc58865659dc739fc9adec2306c7f97151d467 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Fri, 27 Mar 2026 15:50:24 +0100 Subject: [PATCH 12/16] chore: enable `uvloop` extra in granian dependency for ASGI --- pyproject.toml | 2 +- uv.lock | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 085dcd0fee9c88..60264b09ad04b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ dependencies = [ "google-cloud-storage-transfer>=1.17.0", "google-crc32c>=1.6.0", "googleapis-common-protos>=1.63.2", - "granian[pname,reload]>=2.7", + "granian[pname,reload,uvloop]>=2.7", "grpc-google-iam-v1>=0.13.1", # Note, grpcio>1.30.0 requires setting GRPC_POLL_STRATEGY=epoll1 # See https://github.com/grpc/grpc/issues/23796 and diff --git a/uv.lock b/uv.lock index f8218bac57bd66..228ef9aead21f2 100644 --- a/uv.lock +++ b/uv.lock @@ -796,6 +796,9 @@ pname = [ reload = [ { name = "watchfiles", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +uvloop = [ + { name = "uvloop", marker = "(platform_python_implementation == 'CPython' and sys_platform == 'darwin') or (platform_python_implementation == 'CPython' and sys_platform == 'linux')" }, +] [[package]] name = "grpc-google-iam-v1" @@ -2147,7 +2150,7 @@ dependencies = [ { name = "google-cloud-storage-transfer", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "google-crc32c", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "googleapis-common-protos", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, - { name = "granian", extra = ["pname", "reload"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "granian", extra = ["pname", "reload", "uvloop"], marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpc-google-iam-v1", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "grpcio", version = "1.67.0", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux')" }, { name = "grpcio", version = "1.75.1", source = { registry = "https://pypi.devinfra.sentry.io/simple" }, marker = "(python_full_version >= '3.14' and sys_platform == 'darwin') or (python_full_version >= '3.14' and sys_platform == 'linux')" }, @@ -2316,7 +2319,7 @@ requires-dist = [ { name = "google-cloud-storage-transfer", specifier = ">=1.17.0" }, { name = "google-crc32c", specifier = ">=1.6.0" }, { name = "googleapis-common-protos", specifier = ">=1.63.2" }, - { name = "granian", extras = ["pname", "reload"], specifier = ">=2.7" }, + { name = "granian", extras = ["pname", "reload", "uvloop"], specifier = ">=2.7" }, { name = "grpc-google-iam-v1", specifier = ">=0.13.1" }, { name = "grpcio", specifier = ">=1.67.0" }, { name = "hiredis", specifier = ">=2.3.2" }, @@ -3088,6 +3091,17 @@ socks = [ { name = "pysocks", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] +[[package]] +name = "uvloop" +version = "0.21.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:bfd55dfcc2a512316e65f16e503e9e450cab148ef11df4e4e679b5e8253a5281" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:787ae31ad8a2856fc4e7c095341cccc7209bd657d0e71ad0dc2ea83c4a6fa8af" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5ee4d4ef48036ff6e5cfffb09dd192c7a5027153948d85b8da7ff705065bacc6" }, + { url = "https://pypi.devinfra.sentry.io/wheels/uvloop-0.21.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3df876acd7ec037a3d005b3ab85a7e4110422e4d9c1571d4fc89b0fc41b6816" }, +] + [[package]] name = "virtualenv" version = "20.26.6" From 12eb8df63e150ee983f1f4647bbfab41ee645f51 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 27 Mar 2026 17:06:02 +0000 Subject: [PATCH 13/16] Fix circuit breaker window recovery check Co-authored-by: Armen Zambrano G. Applied via @cursor push command --- src/sentry/hybridcloud/apigateway_async/circuitbreaker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py index 5272d6de84d3c2..e43500fccfa9e6 100644 --- a/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py +++ b/src/sentry/hybridcloud/apigateway_async/circuitbreaker.py @@ -46,6 +46,7 @@ def counter_incr(self) -> None: self._counters[self._counter_idx] += 1 def window_overflow(self) -> bool: + self._maybe_counter_flip() return self._counters[self._counter_idx] > self.failures def overflow(self) -> bool: From 944123b1516ac9de93ff0c225a19a854cb558ed8 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Mon, 30 Mar 2026 12:11:50 +0200 Subject: [PATCH 14/16] fix: apigateway async proxy `_adapt_response` content type translation --- src/sentry/hybridcloud/apigateway_async/proxy.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/sentry/hybridcloud/apigateway_async/proxy.py b/src/sentry/hybridcloud/apigateway_async/proxy.py index 3d9900b20a27c4..7fed6928dfaed8 100644 --- a/src/sentry/hybridcloud/apigateway_async/proxy.py +++ b/src/sentry/hybridcloud/apigateway_async/proxy.py @@ -8,7 +8,6 @@ import logging from collections.abc import AsyncGenerator, AsyncIterator from urllib.parse import urljoin, urlparse -from wsgiref.util import is_hop_by_hop import httpx from asgiref.sync import sync_to_async @@ -75,8 +74,9 @@ async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse: """Convert an httpx Response into a Django response.""" + if content_type := response.headers.get("Content-Type", None): + del response.headers["Content-Type"] new_headers = clean_outbound_headers(response.headers) - content_type = new_headers.pop("Content-Type", None) streamed_response = StreamingHttpResponse( streaming_content=_stream_response_and_close(response), @@ -85,8 +85,7 @@ def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpR ) for header, value in new_headers.items(): - if not is_hop_by_hop(header): - streamed_response[header] = value + streamed_response[header] = value streamed_response[PROXY_DIRECT_LOCATION_HEADER] = remote_url return streamed_response From 71f31e39953c99381ece78e18c9800a9b78051a8 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Mon, 30 Mar 2026 12:17:08 +0200 Subject: [PATCH 15/16] chore: allow apigateway proxy config through env vars --- src/sentry/conf/server.py | 8 ++++---- src/sentry/runner/commands/devserver.py | 2 +- tests/conftest.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index f4bc43aed2786b..02e2e9f64e8827 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -371,7 +371,7 @@ def env( # so that responses aren't modified after Content-Length is set, or have the # response modifying middleware reset the Content-Length header. # This is because CommonMiddleware Sets the Content-Length header for non-streaming responses. -APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "").lower() in ("1", "true", "y", "yes") +APIGW_ASYNC = os.environ.get("SENTRY_APIGW_ASYNC", "").lower() in ("1", "true", "y", "yes") APIGW_MIDDLEWARE = ( "sentry.hybridcloud.apigateway_async.middleware.ApiGatewayMiddleware" if APIGW_ASYNC @@ -3187,9 +3187,9 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: } # Used in tests to skip forwarding relay paths to a region silo that does not exist. APIGATEWAY_PROXY_SKIP_RELAY = False -APIGATEWAY_PROXY_MAX_CONCURRENCY = 100 -APIGATEWAY_PROXY_MAX_FAILURES = 100 -APIGATEWAY_PROXY_FAILURE_WINDOW = 60 +APIGATEWAY_PROXY_MAX_CONCURRENCY = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_CONCURRENCY", 100)) +APIGATEWAY_PROXY_MAX_FAILURES = int(os.environ.get("SENTRY_APIGW_PROXY_MAX_FAILURES", 100)) +APIGATEWAY_PROXY_FAILURE_WINDOW = int(os.environ.get("SENTRY_APIGW_PROXY_FAILURE_WINDOW", 60)) # Shared resource ids for accounting EVENT_PROCESSING_STORE = "rc_processing_redis" diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 7ab82ee5d6f6bf..6a1e8a526b8588 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -471,7 +471,7 @@ def devserver( "SENTRY_SILO_DEVSERVER": "1", "SENTRY_SILO_MODE": "CONTROL", "SENTRY_REGION": "", - "SENTRY_ASYNC_APIGW": "true", + "SENTRY_APIGW_ASYNC": "true", "SENTRY_CONTROL_SILO_PORT": server_port, "SENTRY_REGION_SILO_PORT": str(ports["region.server"]), "SENTRY_DEVSERVER_BIND": f"127.0.0.1:{server_port}", diff --git a/tests/conftest.py b/tests/conftest.py index c49928870c06c7..efb241a151aae9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ import sentry_sdk # Set async apigateway as soon as possible -os.environ["SENTRY_ASYNC_APIGW"] = "true" +os.environ["SENTRY_APIGW_ASYNC"] = "true" # Disable crash recovery server in pytest-rerunfailures. Under xdist, Sentry's # global socket.setdefaulttimeout(5) causes the server's per-worker recv threads From e1951af302a08acf893e27b002afe82ce8a36392 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Mon, 30 Mar 2026 17:14:45 +0200 Subject: [PATCH 16/16] chore: rebase --- src/sentry/hybridcloud/apigateway/proxy.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/sentry/hybridcloud/apigateway/proxy.py b/src/sentry/hybridcloud/apigateway/proxy.py index aae730476e26b1..9f13ce9b8a6f2c 100644 --- a/src/sentry/hybridcloud/apigateway/proxy.py +++ b/src/sentry/hybridcloud/apigateway/proxy.py @@ -14,7 +14,7 @@ from django.http.response import HttpResponseBase from requests import Response as ExternalResponse from requests import request as external_request -from requests.exceptions import Timeout +from requests.exceptions import ConnectionError, Timeout from sentry import options from sentry.api.exceptions import RequestTimeout @@ -193,18 +193,22 @@ def proxy_cell_request(request: HttpRequest, cell: Cell, url_name: str) -> HttpR ) except Timeout: metrics.incr("apigateway.proxy.request_timeout", tags=metric_tags) - try: - if circuit_breaker is not None: - circuit_breaker.record_error() - except Exception: - logger.exception("Failed to record circuitbreaker failure") + if circuit_breaker is not None: + circuit_breaker.record_error() # remote silo timeout. Use DRF timeout instead raise RequestTimeout() + except ConnectionError: + metrics.incr("apigateway.proxy.connection_error", tags=metric_tags) + if circuit_breaker is not None: + circuit_breaker.record_error() + + raise - if resp.status_code >= 500 and circuit_breaker is not None: + if resp.status_code >= 502: metrics.incr("apigateway.proxy.request_failed", tags=metric_tags) - circuit_breaker.record_error() + if circuit_breaker is not None: + circuit_breaker.record_error() new_headers = clean_outbound_headers(resp.headers) resp.headers.clear()