Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/ahttpx/_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ async def send_body(self, body: bytes) -> None:
# Handle body close
self.send_state = State.DONE

async def wait_ready(self) -> bool:
"""
Wait until read data starts arriving, and return `True`.
Return `False` if the stream closes.
"""
return await self.parser.wait_ready()

async def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
"""
Receive the initial request method line:
Expand Down Expand Up @@ -453,6 +460,15 @@ def _push_back(self, buffer):
assert self._buffer == b''
self._buffer = buffer

async def wait_ready(self) -> bool:
"""
Attempt a read, and return True if read succeeds or False if the
stream is closed. The data remains in the read buffer.
"""
data = await self._read_some()
self._push_back(data)
return data != b''

async def read(self, size: int) -> bytes:
"""
Read and return up to 'size' bytes from the stream, with I/O buffering provided.
Expand Down
10 changes: 9 additions & 1 deletion src/ahttpx/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ def __init__(self, stream, endpoint):
async def handle_requests(self):
try:
while not self._parser.is_closed():
if not await self._parser.wait_ready():
# Wait until we have read data, or return
# if the stream closes.
return
# Read the initial part of the request,
# and setup a stream for reading the body.
method, url, headers = await self._recv_head()
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
async with Request(method, url, headers=headers, content=stream) as request:
try:
response = await self._endpoint(request)
Expand All @@ -50,7 +55,10 @@ async def handle_requests(self):
await self._send_head(response)
await self._send_body(response)
if self._parser.is_keepalive():
# If the client hasn't read the request body to
# completion, then do that here.
await stream.read()
# Either revert to idle, or close the connection.
await self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
Expand Down
16 changes: 16 additions & 0 deletions src/httpx/_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ def send_body(self, body: bytes) -> None:
# Handle body close
self.send_state = State.DONE

def wait_ready(self) -> bool:
"""
Wait until read data starts arriving, and return `True`.
Return `False` if the stream closes.
"""
return self.parser.wait_ready()

def recv_method_line(self) -> tuple[bytes, bytes, bytes]:
"""
Receive the initial request method line:
Expand Down Expand Up @@ -453,6 +460,15 @@ def _push_back(self, buffer):
assert self._buffer == b''
self._buffer = buffer

def wait_ready(self) -> bool:
"""
Attempt a read, and return True if read succeeds or False if the
stream is closed. The data remains in the read buffer.
"""
data = self._read_some()
self._push_back(data)
return data != b''

def read(self, size: int) -> bytes:
"""
Read and return up to 'size' bytes from the stream, with I/O buffering provided.
Expand Down
15 changes: 10 additions & 5 deletions src/httpx/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ def __init__(self, stream, endpoint):
def handle_requests(self):
try:
while not self._parser.is_closed():
if not self._parser.wait_ready():
# Wait until we have read data, or return
# if the stream closes.
return
# Read the initial part of the request,
# and setup a stream for reading the body.
method, url, headers = self._recv_head()
stream = HTTPStream(self._recv_body, self._reset)
# TODO: Handle endpoint exceptions
with Request(method, url, headers=headers, content=stream) as request:
try:
response = self._endpoint(request)
Expand All @@ -50,7 +55,10 @@ def handle_requests(self):
self._send_head(response)
self._send_body(response)
if self._parser.is_keepalive():
# If the client hasn't read the request body to
# completion, then do that here.
stream.read()
# Either revert to idle, or close the connection.
self._reset()
except Exception:
logger.error("Internal Server Error", exc_info=True)
Expand Down Expand Up @@ -102,10 +110,7 @@ def __init__(self, host, port):

def wait(self):
while(True):
try:
sleep(1)
except KeyboardInterrupt:
break
sleep(1)


@contextlib.contextmanager
Expand Down