Skip to content

Commit 6a7fe9d

Browse files
committed
wip
1 parent 6d197c8 commit 6a7fe9d

File tree

10 files changed

+376
-170
lines changed

10 files changed

+376
-170
lines changed

src/sentry/hybridcloud/apigateway/apigateway.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ def _get_view_silo_mode(view_func: Callable[..., HttpResponseBase]) -> frozenset
3131

3232

3333
async def proxy_request_if_needed(
34-
request: Request, view_func: Callable[..., HttpResponseBase], view_kwargs: dict[str, Any]
34+
request: Request,
35+
view_func: Callable[..., HttpResponseBase],
36+
view_kwargs: dict[str, Any],
3537
) -> HttpResponseBase | None:
3638
"""
3739
Main execution flow for the API Gateway.
@@ -73,7 +75,7 @@ async def proxy_request_if_needed(
7375
"kind": "error-embed",
7476
},
7577
)
76-
return proxy_error_embed_request(request, dsn, url_name)
78+
return await proxy_error_embed_request(request, dsn, url_name)
7779

7880
if (
7981
request.resolver_match
Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections.abc import Callable
45
from typing import Any
56

6-
# from asgiref.sync import iscoroutinefunction, markcoroutinefunction
7+
from asgiref.sync import async_to_sync, iscoroutinefunction, markcoroutinefunction
78
from django.http.response import HttpResponseBase
89
from rest_framework.request import Request
910

@@ -14,17 +15,57 @@ class ApiGatewayMiddleware:
1415
"""Proxy requests intended for remote silos"""
1516

1617
async_capable = True
17-
sync_capable = False
18+
sync_capable = True
1819

1920
def __init__(self, get_response: Callable[[Request], HttpResponseBase]):
2021
self.get_response = get_response
21-
# if iscoroutinefunction(self.get_response):
22-
# markcoroutinefunction(self)
22+
if iscoroutinefunction(self.get_response):
23+
markcoroutinefunction(self)
2324

2425
def __call__(self, request: Request) -> HttpResponseBase:
26+
if iscoroutinefunction(self):
27+
return self.__acall__(request)
2528
return self.get_response(request)
2629

27-
async def process_view(
30+
async def __acall__(self, request: Request) -> HttpResponseBase:
31+
return await self.get_response(request)
32+
33+
def process_view(
34+
self,
35+
request: Request,
36+
view_func: Callable[..., HttpResponseBase],
37+
view_args: tuple[str],
38+
view_kwargs: dict[str, Any],
39+
) -> HttpResponseBase | None:
40+
return self._process_view_match(request, view_func, view_args, view_kwargs)
41+
42+
def _process_view_match(
43+
self,
44+
request: Request,
45+
view_func: Callable[..., HttpResponseBase],
46+
view_args: tuple[str],
47+
view_kwargs: dict[str, Any],
48+
):
49+
#: we check if we're in an async or sync runtime once, then
50+
# overwrite the method with the actual impl.
51+
try:
52+
asyncio.get_running_loop()
53+
method = self._process_view_inner
54+
except RuntimeError:
55+
method = self._process_view_sync
56+
setattr(self, "_process_view_match", method)
57+
return method(request, view_func, view_args, view_kwargs)
58+
59+
def _process_view_sync(
60+
self,
61+
request: Request,
62+
view_func: Callable[..., HttpResponseBase],
63+
view_args: tuple[str],
64+
view_kwargs: dict[str, Any],
65+
) -> HttpResponseBase | None:
66+
return async_to_sync(self._process_view_inner)(request, view_func, view_args, view_kwargs)
67+
68+
async def _process_view_inner(
2869
self,
2970
request: Request,
3071
view_func: Callable[..., HttpResponseBase],
@@ -34,5 +75,4 @@ async def process_view(
3475
proxy_response = await proxy_request_if_needed(request, view_func, view_kwargs)
3576
if proxy_response is not None:
3677
return proxy_response
37-
else:
38-
return None
78+
return None

src/sentry/hybridcloud/apigateway/proxy.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,19 @@
66

77
import asyncio
88
import logging
9-
from collections.abc import Generator
9+
from collections.abc import AsyncGenerator
1010
from urllib.parse import urljoin, urlparse
1111
from wsgiref.util import is_hop_by_hop
1212

13+
import httpx
14+
from asgiref.sync import sync_to_async
1315
from django.conf import settings
1416
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
1517
from django.http.response import HttpResponseBase
16-
from httpx import AsyncClient as HttpClient
17-
from httpx import TimeoutException
18-
from requests import Response as ExternalResponse
1918

20-
# from requests import request as external_request
21-
# from requests.exceptions import Timeout
2219
from sentry import options
2320
from sentry.api.exceptions import RequestTimeout
24-
from sentry.objectstore.endpoints.organization import ChunkedEncodingDecoder, get_raw_body
21+
from sentry.objectstore.endpoints.organization import get_raw_body_async
2522
from sentry.silo.util import (
2623
PROXY_APIGATEWAY_HEADER,
2724
PROXY_DIRECT_LOCATION_HEADER,
@@ -35,11 +32,11 @@
3532
get_cell_for_organization,
3633
)
3734
from sentry.utils import metrics
38-
from sentry.utils.http import BodyWithLength
35+
from sentry.utils.http import BodyAsyncWrapper
3936

4037
logger = logging.getLogger(__name__)
4138

42-
proxy_client = HttpClient()
39+
proxy_client = httpx.AsyncClient()
4340

4441
# Endpoints that handle uploaded files have higher timeouts configured
4542
# and we need to honor those timeouts when proxying.
@@ -59,21 +56,27 @@
5956
PROXY_CHUNK_SIZE = 512 * 1024
6057

6158

62-
def _parse_response(response: ExternalResponse, remote_url: str) -> StreamingHttpResponse:
63-
"""
64-
Convert the Responses class from requests into the drf Response
65-
"""
59+
async def _stream_response_and_close(response: httpx.Response) -> AsyncGenerator[bytes]:
60+
"""Yield chunks from an httpx response and close the connection when done."""
61+
try:
62+
async for chunk in response.aiter_bytes(PROXY_CHUNK_SIZE):
63+
yield chunk
64+
finally:
65+
await response.aclose()
66+
67+
68+
def _adapt_response(response: httpx.Response, remote_url: str) -> StreamingHttpResponse:
69+
"""Convert an httpx Response into a Django response."""
6670

67-
# def stream_response() -> Generator[bytes]:
68-
# yield from response.iter_content(PROXY_CHUNK_SIZE)
6971
new_headers = clean_outbound_headers(response.headers)
72+
content_type = new_headers.pop("Content-Type", None)
7073

7174
streamed_response = StreamingHttpResponse(
72-
streaming_content=response.aiter_bytes(),
75+
streaming_content=_stream_response_and_close(response),
7376
status=response.status_code,
74-
content_type=new_headers.pop("Content-Type", None),
77+
content_type=content_type,
7578
)
76-
# Add Headers to response
79+
7780
for header, value in new_headers.items():
7881
if not is_hop_by_hop(header):
7982
streamed_response[header] = value
@@ -82,13 +85,20 @@ def _parse_response(response: ExternalResponse, remote_url: str) -> StreamingHtt
8285
return streamed_response
8386

8487

88+
async def _stream_request(body) -> AsyncGenerator[bytes]:
89+
async for chunk in body:
90+
yield chunk
91+
92+
8593
async def proxy_request(
86-
request: HttpRequest, org_id_or_slug: str, url_name: str
94+
request: HttpRequest,
95+
org_id_or_slug: str,
96+
url_name: str,
8797
) -> HttpResponseBase:
8898
"""Take a django request object and proxy it to a remote location given an org_id_or_slug"""
8999

90100
try:
91-
cell = get_cell_for_organization(org_id_or_slug)
101+
cell = await sync_to_async(get_cell_for_organization)(org_id_or_slug)
92102
except CellResolutionError as e:
93103
logger.info("region_resolution_error", extra={"org_slug": org_id_or_slug, "error": str(e)})
94104
return HttpResponse(status=404)
@@ -128,7 +138,9 @@ async def proxy_error_embed_request(
128138

129139

130140
async def proxy_cell_request(
131-
request: HttpRequest, cell: Cell, url_name: str
141+
request: HttpRequest,
142+
cell: Cell,
143+
url_name: str,
132144
) -> StreamingHttpResponse:
133145
"""Take a django request object and proxy it to a cell silo"""
134146
target_url = urljoin(cell.address, request.path)
@@ -137,7 +149,6 @@ async def proxy_cell_request(
137149
header_dict = clean_proxy_headers(request.headers)
138150
header_dict[PROXY_APIGATEWAY_HEADER] = "true"
139151

140-
# TODO: use requests session for connection pooling capabilities
141152
assert request.method is not None
142153
query_params = request.GET
143154

@@ -148,31 +159,26 @@ async def proxy_cell_request(
148159
if settings.APIGATEWAY_PROXY_SKIP_RELAY and request.path.startswith("/api/0/relays/"):
149160
return StreamingHttpResponse(streaming_content="relay proxy skipped", status=404)
150161

151-
data: bytes | Generator[bytes] | ChunkedEncodingDecoder | BodyWithLength | None = None
162+
data: AsyncGenerator[bytes] | None = None
152163
if url_name == "sentry-api-0-organization-objectstore":
153164
if content_encoding:
154165
header_dict["Content-Encoding"] = content_encoding
155-
data = get_raw_body(request)
166+
data = get_raw_body_async(request)
156167
else:
157-
data = BodyWithLength(request)
168+
data = BodyAsyncWrapper(request.body)
158169

159170
try:
160171
with metrics.timer("apigateway.proxy_request.duration", tags=metric_tags):
161-
async with proxy_client.stream(
172+
req = proxy_client.build_request(
162173
request.method,
163174
target_url,
164175
headers=header_dict,
165176
params=dict(query_params) if query_params is not None else None,
166-
data=data,
177+
content=_stream_request(data),
167178
timeout=timeout,
168-
follow_redirects=False,
169-
) as resp:
170-
return _parse_response(resp, target_url)
171-
except (TimeoutException, asyncio.CancelledError):
179+
)
180+
resp = await proxy_client.send(req, stream=True, follow_redirects=False)
181+
return _adapt_response(resp, target_url)
182+
except (httpx.TimeoutException, asyncio.CancelledError):
172183
# remote silo timeout. Use DRF timeout instead
173184
raise RequestTimeout()
174-
175-
# new_headers = clean_outbound_headers(resp.headers)
176-
# resp.headers.clear()
177-
# resp.headers.update(new_headers)
178-
# return _parse_response(resp, target_url)

src/sentry/objectstore/endpoints/organization.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
from __future__ import annotations
22

3-
from collections.abc import Callable, Generator
3+
from collections.abc import AsyncGenerator, Callable, Generator
44
from typing import Any
55
from urllib.parse import urlparse
66
from wsgiref.util import is_hop_by_hop
77

88
import requests
9+
from asgiref.sync import sync_to_async
910
from django.http import HttpRequest, StreamingHttpResponse
1011
from requests import Response as ExternalResponse
1112
from rest_framework.request import Request
1213
from rest_framework.response import Response
1314

14-
from sentry.utils.http import BodyWithLength
15+
from sentry.utils.http import BodyAsyncWrapper, BodyWithLength, BodyWithLengthAiter
1516

1617
# TODO(granian): Remove this and related code paths when we fully switch from uwsgi to granian
1718
uwsgi: Any = None
@@ -150,6 +151,32 @@ def stream_generator():
150151
return BodyWithLength(request)
151152

152153

154+
def get_raw_body_async(
155+
request: HttpRequest,
156+
) -> AsyncGenerator[bytes] | ChunkedEncodingAsyncDecoder | BodyWithLengthAiter | None:
157+
if request.body:
158+
return BodyAsyncWrapper(request.body)
159+
160+
wsgi_input = request.META.get("wsgi.input")
161+
if "granian" in request.META.get("SERVER_SOFTWARE", "").lower():
162+
return BodyAsyncWrapper(wsgi_input)
163+
164+
# wsgiref will raise an exception and hang when attempting to read wsgi.input while there's no body.
165+
# For now, support bodies only on PUT and POST requests when not using Granian.
166+
if request.method not in ("PUT", "POST"):
167+
return None
168+
169+
# wsgiref (dev/test server)
170+
if (
171+
hasattr(wsgi_input, "_read")
172+
and request.headers.get("Transfer-Encoding", "").lower() == "chunked"
173+
):
174+
return ChunkedEncodingAsyncDecoder(wsgi_input._read) # type: ignore[union-attr]
175+
176+
# wsgiref and the request has been already proxied through control silo
177+
return BodyWithLength(request).__aiter__()
178+
179+
153180
def get_target_url(path: str) -> str:
154181
base = options.get("objectstore.config")["base_url"].rstrip("/")
155182
# `path` should be a relative path, only grab that part
@@ -248,3 +275,10 @@ def read(self, size: int = -1) -> bytes:
248275
raise ValueError("Malformed chunk encoded stream")
249276

250277
return b"".join(buffer)
278+
279+
280+
class ChunkedEncodingAsyncDecoder(ChunkedEncodingDecoder):
281+
async def __anext__(self) -> bytes:
282+
if self._done:
283+
raise StopIteration
284+
return await sync_to_async(self.read)()

0 commit comments

Comments
 (0)