diff --git a/CHANGES/10037.misc.rst b/CHANGES/10037.misc.rst new file mode 100644 index 00000000000..655c804c995 --- /dev/null +++ b/CHANGES/10037.misc.rst @@ -0,0 +1 @@ +Improved performances of creating objects during the HTTP request lifecycle -- by :user:`bdraco`. diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index 006112bc6f4..6e628a7c2fe 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -258,7 +258,7 @@ def data_received(self, data: bytes) -> None: if not data: return - # custom payload parser + # custom payload parser - currently always WebSocketReader if self._payload_parser is not None: eof, tail = self._payload_parser.feed_data(data) if eof: @@ -268,57 +268,56 @@ def data_received(self, data: bytes) -> None: if tail: self.data_received(tail) return - else: - if self._upgraded or self._parser is None: - # i.e. websocket connection, websocket parser is not set yet - self._tail += data + + if self._upgraded or self._parser is None: + # i.e. websocket connection, websocket parser is not set yet + self._tail += data + return + + # parse http messages + try: + messages, upgraded, tail = self._parser.feed_data(data) + except BaseException as underlying_exc: + if self.transport is not None: + # connection.release() could be called BEFORE + # data_received(), the transport is already + # closed in this case + self.transport.close() + # should_close is True after the call + if isinstance(underlying_exc, HttpProcessingError): + exc = HttpProcessingError( + code=underlying_exc.code, + message=underlying_exc.message, + headers=underlying_exc.headers, + ) else: - # parse http messages - try: - messages, upgraded, tail = self._parser.feed_data(data) - except BaseException as underlying_exc: - if self.transport is not None: - # connection.release() could be called BEFORE - # data_received(), the transport is already - # closed in this case - self.transport.close() - # should_close is True after the call - if isinstance(underlying_exc, HttpProcessingError): - exc = HttpProcessingError( - code=underlying_exc.code, - message=underlying_exc.message, - headers=underlying_exc.headers, - ) - else: - exc = HttpProcessingError() - self.set_exception(exc, underlying_exc) - return - - self._upgraded = upgraded - - payload: Optional[StreamReader] = None - for message, payload in messages: - if message.should_close: - self._should_close = True - - self._payload = payload - - if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: - self.feed_data((message, EMPTY_PAYLOAD)) - else: - self.feed_data((message, payload)) - if payload is not None: - # new message(s) was processed - # register timeout handler unsubscribing - # either on end-of-stream or immediately for - # EMPTY_PAYLOAD - if payload is not EMPTY_PAYLOAD: - payload.on_eof(self._drop_timeout) - else: - self._drop_timeout() + exc = HttpProcessingError() + self.set_exception(exc, underlying_exc) + return - if tail: - if upgraded: - self.data_received(tail) - else: - self._tail = tail + self._upgraded = upgraded + + payload: Optional[StreamReader] = None + for message, payload in messages: + if message.should_close: + self._should_close = True + + self._payload = payload + + if self._skip_payload or message.code in EMPTY_BODY_STATUS_CODES: + self.feed_data((message, EMPTY_PAYLOAD)) + else: + self.feed_data((message, payload)) + + if payload is not None: + # new message(s) was processed + # register timeout handler unsubscribing + # either on end-of-stream or immediately for + # EMPTY_PAYLOAD + if payload is not EMPTY_PAYLOAD: + payload.on_eof(self._drop_timeout) + else: + self._drop_timeout() + + if upgraded and tail: + self.data_received(tail) diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 73309c61342..f2db7e85e50 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -802,7 +802,6 @@ def __init__( ) -> None: # URL forbids subclasses, so a simple type check is enough. assert type(url) is URL - super().__init__() self.method = method diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 9eddd829916..f04b74833bc 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -719,15 +719,12 @@ def ceil_timeout( class HeadersMixin: - __slots__ = ("_content_type", "_content_dict", "_stored_content_type") + """Mixin for handling headers.""" _headers: MultiMapping[str] - - def __init__(self) -> None: - super().__init__() - self._content_type: Optional[str] = None - self._content_dict: Optional[Dict[str, str]] = None - self._stored_content_type: Union[str, None, _SENTINEL] = sentinel + _content_type: Optional[str] = None + _content_dict: Optional[Dict[str, str]] = None + _stored_content_type: Union[str, None, _SENTINEL] = sentinel def _parse_content_type(self, raw: Optional[str]) -> None: self._stored_content_type = raw @@ -921,22 +918,14 @@ def __repr__(self) -> str: class CookieMixin: - # The `_cookies` slots is not defined here because non-empty slots cannot - # be combined with an Exception base class, as is done in HTTPException. - # CookieMixin subclasses with slots should define the `_cookies` - # slot themselves. - __slots__ = () + """Mixin for handling cookies.""" - def __init__(self) -> None: - super().__init__() - # Mypy doesn't like that _cookies isn't in __slots__. - # See the comment on this class's __slots__ for why this is OK. - self._cookies: Optional[SimpleCookie] = None # type: ignore[misc] + _cookies: Optional[SimpleCookie] = None @property def cookies(self) -> SimpleCookie: if self._cookies is None: - self._cookies = SimpleCookie() # type: ignore[misc] + self._cookies = SimpleCookie() return self._cookies def set_cookie( @@ -958,7 +947,7 @@ def set_cookie( Also updates only those params which are not None. """ if self._cookies is None: - self._cookies = SimpleCookie() # type: ignore[misc] + self._cookies = SimpleCookie() self._cookies[name] = value c = self._cookies[name] diff --git a/aiohttp/web_exceptions.py b/aiohttp/web_exceptions.py index 148eb22db3f..5fdd27695f5 100644 --- a/aiohttp/web_exceptions.py +++ b/aiohttp/web_exceptions.py @@ -96,7 +96,6 @@ def __init__( text: Optional[str] = None, content_type: Optional[str] = None, ) -> None: - super().__init__() if reason is None: reason = self.default_reason elif "\n" in reason: diff --git a/aiohttp/web_fileresponse.py b/aiohttp/web_fileresponse.py index e269638482c..daadda9a207 100644 --- a/aiohttp/web_fileresponse.py +++ b/aiohttp/web_fileresponse.py @@ -85,6 +85,10 @@ def __init__( self._path = pathlib.Path(path) self._chunk_size = chunk_size + def _seek_and_read(self, fobj: IO[Any], offset: int, chunk_size: int) -> bytes: + fobj.seek(offset) + return fobj.read(chunk_size) # type: ignore[no-any-return] + async def _sendfile_fallback( self, writer: AbstractStreamWriter, fobj: IO[Any], offset: int, count: int ) -> AbstractStreamWriter: @@ -93,10 +97,9 @@ async def _sendfile_fallback( chunk_size = self._chunk_size loop = asyncio.get_event_loop() - - await loop.run_in_executor(None, fobj.seek, offset) - - chunk = await loop.run_in_executor(None, fobj.read, chunk_size) + chunk = await loop.run_in_executor( + None, self._seek_and_read, fobj, offset, chunk_size + ) while chunk: await writer.write(chunk) count = count - chunk_size diff --git a/aiohttp/web_request.py b/aiohttp/web_request.py index e2fcbae73e0..15283b890bd 100644 --- a/aiohttp/web_request.py +++ b/aiohttp/web_request.py @@ -127,26 +127,8 @@ class BaseRequest(MutableMapping[str, Any], HeadersMixin): hdrs.METH_DELETE, } - __slots__ = ( - "_message", - "_protocol", - "_payload_writer", - "_payload", - "_headers", - "_method", - "_version", - "_rel_url", - "_post", - "_read_bytes", - "_state", - "_cache", - "_task", - "_client_max_size", - "_loop", - "_transport_sslcontext", - "_transport_peername", - "__weakref__", - ) + _post: Optional[MultiDictProxy[Union[str, bytes, FileField]]] = None + _read_bytes: Optional[bytes] = None def __init__( self, @@ -163,9 +145,6 @@ def __init__( host: Optional[str] = None, remote: Optional[str] = None, ) -> None: - super().__init__() - if state is None: - state = {} self._message = message self._protocol = protocol self._payload_writer = payload_writer @@ -189,20 +168,18 @@ def __init__( self._cache["scheme"] = url.scheme self._rel_url = url.relative() else: - self._rel_url = message.url + self._rel_url = url if scheme is not None: self._cache["scheme"] = scheme if host is not None: self._cache["host"] = host - self._post: Optional[MultiDictProxy[Union[str, bytes, FileField]]] = None - self._read_bytes: Optional[bytes] = None - self._state = state + self._state = {} if state is None else state self._task = task self._client_max_size = client_max_size self._loop = loop - transport = self._protocol.transport + transport = protocol.transport assert transport is not None self._transport_sslcontext = transport.get_extra_info("sslcontext") self._transport_peername = transport.get_extra_info("peername") @@ -838,16 +815,8 @@ def _finish(self) -> None: class Request(BaseRequest): - __slots__ = ("_match_info",) - - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - - # matchdict, route_name, handler - # or information about traversal lookup - # initialized after route resolving - self._match_info: Optional[UrlMappingMatchInfo] = None + _match_info: Optional["UrlMappingMatchInfo"] = None def clone( self, diff --git a/aiohttp/web_response.py b/aiohttp/web_response.py index 02d0642afad..cb3e3717c66 100644 --- a/aiohttp/web_response.py +++ b/aiohttp/web_response.py @@ -76,28 +76,20 @@ class ContentCoding(enum.Enum): class StreamResponse(BaseClass, HeadersMixin, CookieMixin): - __slots__ = ( - "_length_check", - "_body", - "_keep_alive", - "_chunked", - "_compression", - "_compression_force", - "_compression_strategy", - "_req", - "_payload_writer", - "_eof_sent", - "_must_be_empty_body", - "_body_length", - "_state", - "_headers", - "_status", - "_reason", - "_cookies", - "__weakref__", - ) _body: Union[None, bytes, bytearray, Payload] + _length_check = True + _body = None + _keep_alive: Optional[bool] = None + _chunked: bool = False + _compression: bool = False + _compression_strategy: int = zlib.Z_DEFAULT_STRATEGY + _compression_force: Optional[ContentCoding] = None + _req: Optional["BaseRequest"] = None + _payload_writer: Optional[AbstractStreamWriter] = None + _eof_sent: bool = False + _must_be_empty_body: Optional[bool] = None + _body_length = 0 def __init__( self, @@ -114,20 +106,6 @@ def __init__( the headers when creating a new response object. It is not intended to be used by external code. """ - super().__init__() - self._length_check = True - self._body = None - self._keep_alive: Optional[bool] = None - self._chunked = False - self._compression = False - self._compression_strategy: int = zlib.Z_DEFAULT_STRATEGY - self._compression_force: Optional[ContentCoding] = None - - self._req: Optional[BaseRequest] = None - self._payload_writer: Optional[AbstractStreamWriter] = None - self._eof_sent = False - self._must_be_empty_body: Optional[bool] = None - self._body_length = 0 self._state: Dict[str, Any] = {} if _real_headers is not None: @@ -197,12 +175,11 @@ def body_length(self) -> int: def enable_chunked_encoding(self) -> None: """Enables automatic chunked transfer encoding.""" - self._chunked = True - if hdrs.CONTENT_LENGTH in self._headers: raise RuntimeError( "You can't enable chunked encoding when a content length is set" ) + self._chunked = True def enable_compression( self, @@ -410,8 +387,6 @@ async def _prepare_headers(self) -> None: if not self._must_be_empty_body: writer.enable_chunking() headers[hdrs.TRANSFER_ENCODING] = "chunked" - if hdrs.CONTENT_LENGTH in headers: - del headers[hdrs.CONTENT_LENGTH] elif self._length_check: # Disabled for WebSockets writer.length = self.content_length if writer.length is None: @@ -528,11 +503,8 @@ def __eq__(self, other: object) -> bool: class Response(StreamResponse): - __slots__ = ( - "_compressed_body", - "_zlib_executor_size", - "_zlib_executor", - ) + + _compressed_body: Optional[bytes] = None def __init__( self, @@ -598,7 +570,6 @@ def __init__( else: self.body = body - self._compressed_body: Optional[bytes] = None self._zlib_executor_size = zlib_executor_size self._zlib_executor = zlib_executor diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 2f0f809280b..4f104780c7d 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -64,33 +64,23 @@ def __bool__(self) -> bool: class WebSocketResponse(StreamResponse): - __slots__ = ( - "_protocols", - "_ws_protocol", - "_writer", - "_reader", - "_closed", - "_closing", - "_conn_lost", - "_close_code", - "_loop", - "_waiting", - "_close_wait", - "_exception", - "_timeout", - "_receive_timeout", - "_autoclose", - "_autoping", - "_heartbeat", - "_heartbeat_when", - "_heartbeat_cb", - "_pong_heartbeat", - "_pong_response_cb", - "_compress", - "_max_msg_size", - "_ping_task", - "_writer_limit", - ) + + _length_check: bool = False + _ws_protocol: Optional[str] = None + _writer: Optional[WebSocketWriter] = None + _reader: Optional[WebSocketDataQueue] = None + _closed: bool = False + _closing: bool = False + _conn_lost: int = 0 + _close_code: Optional[int] = None + _loop: Optional[asyncio.AbstractEventLoop] = None + _waiting: bool = False + _close_wait: Optional[asyncio.Future[None]] = None + _exception: Optional[BaseException] = None + _heartbeat_when: float = 0.0 + _heartbeat_cb: Optional[asyncio.TimerHandle] = None + _pong_response_cb: Optional[asyncio.TimerHandle] = None + _ping_task: Optional[asyncio.Task[None]] = None def __init__( self, @@ -106,32 +96,16 @@ def __init__( writer_limit: int = DEFAULT_LIMIT, ) -> None: super().__init__(status=101) - self._length_check = False self._protocols = protocols - self._ws_protocol: Optional[str] = None - self._writer: Optional[WebSocketWriter] = None - self._reader: Optional[WebSocketDataQueue] = None - self._closed = False - self._closing = False - self._conn_lost = 0 - self._close_code: Optional[int] = None - self._loop: Optional[asyncio.AbstractEventLoop] = None - self._waiting: bool = False - self._close_wait: Optional[asyncio.Future[None]] = None - self._exception: Optional[BaseException] = None self._timeout = timeout self._receive_timeout = receive_timeout self._autoclose = autoclose self._autoping = autoping self._heartbeat = heartbeat - self._heartbeat_when = 0.0 - self._heartbeat_cb: Optional[asyncio.TimerHandle] = None if heartbeat is not None: self._pong_heartbeat = heartbeat / 2.0 - self._pong_response_cb: Optional[asyncio.TimerHandle] = None self._compress: Union[bool, int] = compress self._max_msg_size = max_msg_size - self._ping_task: Optional[asyncio.Task[None]] = None self._writer_limit = writer_limit def _cancel_heartbeat(self) -> None: diff --git a/tests/test_client_proto.py b/tests/test_client_proto.py index 52065eca318..e5d62d1e467 100644 --- a/tests/test_client_proto.py +++ b/tests/test_client_proto.py @@ -75,6 +75,88 @@ async def test_uncompleted_message(loop: asyncio.AbstractEventLoop) -> None: assert dict(exc.message.headers) == {"Location": "http://python.org/"} +async def test_data_received_after_close(loop: asyncio.AbstractEventLoop) -> None: + proto = ResponseHandler(loop=loop) + transport = mock.Mock() + proto.connection_made(transport) + proto.set_response_params(read_until_eof=True) + proto.close() + assert transport.close.called + transport.close.reset_mock() + proto.data_received(b"HTTP\r\n\r\n") + assert proto.should_close + assert not transport.close.called + assert isinstance(proto.exception(), http.HttpProcessingError) + + +async def test_multiple_responses_one_byte_at_a_time( + loop: asyncio.AbstractEventLoop, +) -> None: + proto = ResponseHandler(loop=loop) + proto.connection_made(mock.Mock()) + conn = mock.Mock(protocol=proto) + proto.set_response_params(read_until_eof=True) + + for _ in range(2): + messages = ( + b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nab" + b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\ncd" + b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nef" + ) + for i in range(len(messages)): + proto.data_received(messages[i : i + 1]) + + expected = [b"ab", b"cd", b"ef"] + for payload in expected: + response = ClientResponse( + "get", + URL("http://def-cl-resp.org"), + writer=mock.Mock(), + continue100=None, + timer=TimerNoop(), + request_info=mock.Mock(), + traces=[], + loop=loop, + session=mock.Mock(), + ) + await response.start(conn) + await response.read() == payload + + +async def test_unexpected_exception_during_data_received( + loop: asyncio.AbstractEventLoop, +) -> None: + proto = ResponseHandler(loop=loop) + + class PatchableHttpResponseParser(http.HttpResponseParser): + """Subclass of HttpResponseParser to make it patchable.""" + + with mock.patch( + "aiohttp.client_proto.HttpResponseParser", PatchableHttpResponseParser + ): + proto.connection_made(mock.Mock()) + conn = mock.Mock(protocol=proto) + proto.set_response_params(read_until_eof=True) + proto.data_received(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nab") + response = ClientResponse( + "get", + URL("http://def-cl-resp.org"), + writer=mock.Mock(), + continue100=None, + timer=TimerNoop(), + request_info=mock.Mock(), + traces=[], + loop=loop, + session=mock.Mock(), + ) + await response.start(conn) + await response.read() == b"ab" + with mock.patch.object(proto._parser, "feed_data", side_effect=ValueError): + proto.data_received(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\ncd") + + assert isinstance(proto.exception(), http.HttpProcessingError) + + async def test_client_protocol_readuntil_eof(loop: asyncio.AbstractEventLoop) -> None: proto = ResponseHandler(loop=loop) transport = mock.Mock() diff --git a/tests/test_web_request.py b/tests/test_web_request.py index 08a091947a6..0086d9b1688 100644 --- a/tests/test_web_request.py +++ b/tests/test_web_request.py @@ -61,8 +61,6 @@ def test_base_ctor() -> None: assert req.keep_alive - assert "__dict__" not in dir(req) - assert req @@ -109,8 +107,6 @@ def test_ctor() -> None: assert req.raw_headers == ((b"FOO", b"bar"),) assert req.task is req._task - assert "__dict__" not in dir(req) - def test_doubleslashes() -> None: # NB: //foo/bar is an absolute URL with foo netloc and /bar path