diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index 371b870201..2c0f153cdb 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -224,6 +224,13 @@ async def send_body(self, body: bytes) -> None: # Handle body close self.send_state = State.DONE + async def wait_ready(self) -> bool: + """ + Wait until read data starts arriving, and return `True`. + Return `False` if the stream closes. + """ + return await self.parser.wait_ready() + async def recv_method_line(self) -> tuple[bytes, bytes, bytes]: """ Receive the initial request method line: @@ -453,6 +460,15 @@ def _push_back(self, buffer): assert self._buffer == b'' self._buffer = buffer + async def wait_ready(self) -> bool: + """ + Attempt a read, and return True if read succeeds or False if the + stream is closed. The data remains in the read buffer. + """ + data = await self._read_some() + self._push_back(data) + return data != b'' + async def read(self, size: int) -> bytes: """ Read and return up to 'size' bytes from the stream, with I/O buffering provided. diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index d2dafe99f7..219342e2db 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -32,9 +32,14 @@ def __init__(self, stream, endpoint): async def handle_requests(self): try: while not self._parser.is_closed(): + if not await self._parser.wait_ready(): + # Wait until we have read data, or return + # if the stream closes. + return + # Read the initial part of the request, + # and setup a stream for reading the body. method, url, headers = await self._recv_head() stream = HTTPStream(self._recv_body, self._reset) - # TODO: Handle endpoint exceptions async with Request(method, url, headers=headers, content=stream) as request: try: response = await self._endpoint(request) @@ -50,7 +55,10 @@ async def handle_requests(self): await self._send_head(response) await self._send_body(response) if self._parser.is_keepalive(): + # If the client hasn't read the request body to + # completion, then do that here. await stream.read() + # Either revert to idle, or close the connection. await self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 1e92f5347c..d275a11192 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -224,6 +224,13 @@ def send_body(self, body: bytes) -> None: # Handle body close self.send_state = State.DONE + def wait_ready(self) -> bool: + """ + Wait until read data starts arriving, and return `True`. + Return `False` if the stream closes. + """ + return self.parser.wait_ready() + def recv_method_line(self) -> tuple[bytes, bytes, bytes]: """ Receive the initial request method line: @@ -453,6 +460,15 @@ def _push_back(self, buffer): assert self._buffer == b'' self._buffer = buffer + def wait_ready(self) -> bool: + """ + Attempt a read, and return True if read succeeds or False if the + stream is closed. The data remains in the read buffer. + """ + data = self._read_some() + self._push_back(data) + return data != b'' + def read(self, size: int) -> bytes: """ Read and return up to 'size' bytes from the stream, with I/O buffering provided. diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 44ec3bff3e..d5a88050ee 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -32,9 +32,14 @@ def __init__(self, stream, endpoint): def handle_requests(self): try: while not self._parser.is_closed(): + if not self._parser.wait_ready(): + # Wait until we have read data, or return + # if the stream closes. + return + # Read the initial part of the request, + # and setup a stream for reading the body. method, url, headers = self._recv_head() stream = HTTPStream(self._recv_body, self._reset) - # TODO: Handle endpoint exceptions with Request(method, url, headers=headers, content=stream) as request: try: response = self._endpoint(request) @@ -50,7 +55,10 @@ def handle_requests(self): self._send_head(response) self._send_body(response) if self._parser.is_keepalive(): + # If the client hasn't read the request body to + # completion, then do that here. stream.read() + # Either revert to idle, or close the connection. self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -102,10 +110,7 @@ def __init__(self, host, port): def wait(self): while(True): - try: - sleep(1) - except KeyboardInterrupt: - break + sleep(1) @contextlib.contextmanager