Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion framework/deproxy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
conn_addr: Optional[str],
is_ssl: bool,
server_hostname: str,
is_http2: bool = None,
):
# Initialize the `BaseDeproxy`
super().__init__(
Expand All @@ -64,7 +65,11 @@ def __init__(
)

self.ssl = is_ssl
self._is_http2 = isinstance(self, DeproxyClientH2)
self._is_http2 = is_http2

if self._is_http2 is None:
self._is_http2 = isinstance(self, DeproxyClientH2)

self._create_context()
self.server_hostname = server_hostname

Expand Down Expand Up @@ -459,6 +464,12 @@ class ReqBodyBuffer:


class DeproxyClientH2(BaseDeproxyClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.h2_connection: Optional[h2.connection.H2Connection] = None
self.encoder = None

@property
def ping_received(self) -> int:
return self._ping_received
Expand Down
45 changes: 28 additions & 17 deletions framework/deproxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
from typing import Optional

import run_config
from framework import stateful
from framework.deproxy_base import BaseDeproxy
from helpers import deproxy, error, tempesta, tf_cfg, util
Expand Down Expand Up @@ -36,6 +35,7 @@ def __init__(self, *, server: "StaticDeproxyServer", sock: socket.socket):
self._cur_responses_list = []
self.__time_to_send: float = 0
self.__new_response: bool = True
self.last_request: Optional[deproxy.Request] = None
self.nrreq: int = 0

self._tcp_logger.debug("New server connection")
Expand Down Expand Up @@ -91,7 +91,7 @@ def handle_read(self):

while self._request_buffer:
try:
request = deproxy.Request(self._request_buffer)
self.last_request = deproxy.Request(self._request_buffer)
self.nrreq += 1
except deproxy.IncompleteMessage:
return None
Expand All @@ -102,8 +102,8 @@ def handle_read(self):
return None

self._http_logger.info("Receive request")
self._http_logger.debug(request)
response, need_close = self._server.receive_request(request)
self._http_logger.debug(self.last_request)
response, need_close = self._server.receive_request(self.last_request)
if self._server.drop_conn_when_request_received:
self.handle_close()
if response:
Expand All @@ -116,18 +116,21 @@ def handle_read(self):

if need_close:
self.close()
self._request_buffer = self._request_buffer[len(request.msg) :]
self._request_buffer = self._request_buffer[len(self.last_request.msg) :]
# Handler will be called even if buffer is empty.
else:
return None

def send_data(self, request: deproxy.Request, data: bytes) -> int:
return self.socket.send(data)

def handle_write(self):
if self._server.delay_before_sending_response and self.__new_response:
self.__new_response = False
return self.sleep()

resp = self._response_buffer[self._responses_done]
sent = self.socket.send(resp[: self._server.segment_size])
sent = self.send_data(self.last_request, resp[: self._server.segment_size])

if sent < 0:
return
Expand Down Expand Up @@ -188,19 +191,27 @@ def _reinit_variables(self):
self._connections: list[ServerConnection] = list()
self._requests: list[deproxy.Request] = list()

def create_new_connection(self, sock):
return ServerConnection(server=self, sock=sock)

def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, _ = pair
if self.segment_size:
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
handler = ServerConnection(server=self, sock=sock)
self._connections.append(handler)
# ATTENTION
# Due to the polling cycle, creating new connection can be
# performed before removing old connection.
# So we can have case with > expected amount of connections
# It's not a error case, it's a problem of polling

if pair is None:
return

sock, _ = pair

if self.segment_size:
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)

handler = self.create_new_connection(sock=sock)
self._connections.append(handler)
# ATTENTION
# Due to the polling cycle, creating new connection can be
# performed before removing old connection.
# So we can have case with > expected amount of connections
# It's not a error case, it's a problem of polling

def handle_error(self):
type_, v, _ = sys.exc_info()
Expand Down
4 changes: 0 additions & 4 deletions framework/nginx_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ def __init__(self, id_, props):
self.conns_n = tempesta.server_conns_default()
self.active_conns = 0
self.requests = 0
self.name = id_
self.status_uri = fill_template(props["status_uri"], props)
self.stop_procedures = [self.stop_nginx, self.remove_config]
self.weight = int(props["weight"]) if "weight" in props else None
self.port_checker = port_checks.FreePortsChecker()

self.clear_stats()

def get_name(self):
return self.name

def clear_stats(self):
self.active_conns = 0
self.requests = 0
Expand Down
7 changes: 7 additions & 0 deletions framework/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Stateful(abc.ABC):
def __init__(self, *, id_: str):
self._state = STATE_STOPPED
self.stop_procedures = []
self.id_ = id_
self._exceptions = []
self._generate_service_id(id_)
self._logger = logging.LoggerAdapter(
Expand All @@ -34,6 +35,12 @@ def _generate_service_id(self, id_: str) -> None:
def __str__(self):
return f"{self.__class__.__name__}"

def get_name(self):
return self.id_

def get_id(self):
return self.id_

@property
def state(self) -> str:
return self._state
Expand Down
3 changes: 3 additions & 0 deletions http2_general/test_h2_block_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
__copyright__ = "Copyright (C) 2023 Tempesta Technologies, Inc."
__license__ = "GPL2"

import time
from h2.errors import ErrorCodes
from hpack import HeaderTuple

from helpers import util
from http2_general.helpers import BlockActionH2Base, H2Base, generate_custom_error_page
from test_suite import marks
from helpers.deproxy import HttpMessage


@marks.parameterize_class(
Expand Down
Loading