diff --git a/.gitignore b/.gitignore index b6e4761..2ea6b6f 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +# Pycharm project settings +.idea/ \ No newline at end of file diff --git a/doipclient/client.py b/doipclient/client.py index 8febfee..c3ef2a3 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -145,10 +145,10 @@ class DoIPClient: :param use_secure: Enables TLS. If set to True, a default SSL context is used. For more control, a preconfigured SSL context can be passed directly. Untested. Should be combined with changing tcp_port to 3496. :type use_secure: Union[bool,ssl.SSLContext] - :param log_level: Logging level - :type log_level: int :param auto_reconnect_tcp: Attempt to automatically reconnect TCP sockets that were closed by peer :type auto_reconnect_tcp: bool + :param vm_specific: Optional 4 byte long int + :type vm_specific: int, optional :raises ConnectionRefusedError: If the activation request fails :raises ValueError: If the IPAddress is neither an IPv4 nor an IPv6 address @@ -166,6 +166,7 @@ def __init__( client_ip_address=None, use_secure=False, auto_reconnect_tcp=False, + vm_specific=None ): self._ecu_logical_address = ecu_logical_address self._client_logical_address = client_logical_address @@ -180,6 +181,7 @@ def __init__( self._protocol_version = protocol_version self._auto_reconnect_tcp = auto_reconnect_tcp self._tcp_close_detected = False + self.vm_specific = vm_specific # Check the ECU IP type to determine socket family # Will raise ValueError if neither a valid IPv4, nor IPv6 address @@ -587,16 +589,19 @@ def request_activation( :type disable_retry: bool, optional :return: The resulting activation response object :rtype: RoutingActivationResponse + :raises ValueError: vm_specific is invalid or out of range """ message = RoutingActivationRequest( - self._client_logical_address, activation_type, vm_specific=vm_specific + self._client_logical_address, + activation_type, + vm_specific=self._validate_vm_specific_value(vm_specific) if vm_specific else self.vm_specific, ) self.send_doip_message(message, disable_retry=disable_retry) while True: result = self.read_doip() - if type(result) == RoutingActivationResponse: + if isinstance(result, RoutingActivationResponse): return result - elif result: + if result: logger.warning( "Received unexpected DoIP message type {}. Ignoring".format( type(result) @@ -824,3 +829,39 @@ def reconnect(self, close_delay=A_PROCESSING_TIME): raise ConnectionRefusedError( f"Activation Request failed with code {result.response_code}" ) + + @staticmethod + def _validate_vm_specific_value(value): + """Validate the VM specific value (must be > 0 and <= 0xffffffff) or None. + If the conditions are not fulfilled, raises an exception. + + :param value: The value to check. + :type value: int, optional + :return: The input value if valid. + :rtype: int or None + :raises ValueError: If the value is invalid or out of range. + """ + if not isinstance(value, int) and value is not None: + raise ValueError("Invalid vm_specific type must be int or None") + if isinstance(value, int) and (value < 0 or value > 0xffffffff): + raise ValueError("Invalid vm_specific value must be > 0 and <= 0xffffffff") + return value + + @property + def vm_specific(self): + """Get the optional OEM specific field value if set. + + :return: vm_specific value + :rtype: int, optional + """ + return self._vm_specific + + @vm_specific.setter + def vm_specific(self, value): + """Set the optional OEM specific field value. If you do not need to send this item, set it to None. + + :param value: The vm_specific value (must be > 0 and <= 0xffffffff) or None + :type value: int, optional + :raises ValueError: Value is invalid or out of range + """ + self._vm_specific = self._validate_vm_specific_value(value) diff --git a/setup.py b/setup.py index a1b3745..01f9a4b 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name="doipclient", - version="1.1.6", + version="1.1.7.dev1", description="A Diagnostic over IP (DoIP) client implementing ISO-13400-2.", long_description=long_description, long_description_content_type="text/x-rst", diff --git a/tests/test_client.py b/tests/test_client.py index d85e6b6..f70c47e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -750,3 +750,60 @@ def test_use_secure_with_external_ssl_context(mock_socket, mocker): mocked_external_wrap_socket = mocked_external_context.wrap_socket mocked_external_wrap_socket.assert_called_once_with(mock_socket) + + +@pytest.mark.parametrize( + "vm_specific, exc", + ((None, False), (0, False), (-1, True), (0xFFFFFFFF, False), (0x100000000, True), ("0x1", True), (10.0, True))) +def test_vm_specific_setter(mock_socket, mocker, vm_specific, exc): + sut = DoIPClient(test_ip, test_logical_address, auto_reconnect_tcp=True) + if exc: + with pytest.raises(ValueError): + sut.vm_specific = vm_specific + else: + sut.vm_specific = vm_specific + assert sut.vm_specific == vm_specific + + +def test_vm_specific_static_value(mock_socket, mocker): + request_activation_spy = mocker.spy(DoIPClient, "request_activation") + mock_socket.rx_queue.append(successful_activation_response_with_vm) + + sut = DoIPClient( + test_ip, + test_logical_address, + auto_reconnect_tcp=True, + activation_type=None, + vm_specific=0x01020304, + ) + sut.request_activation(activation_type=RoutingActivationRequest.ActivationType.Default) + assert mock_socket.tx_queue[-1] == activation_request_with_vm + assert request_activation_spy.call_count == 1 + + +def test_vm_specific_request_activation_bad_value(mock_socket, mocker): + request_activation_spy = mocker.spy(DoIPClient, "request_activation") + mock_socket.rx_queue.append(successful_activation_response_with_vm) + + sut = DoIPClient( + test_ip, + test_logical_address, + auto_reconnect_tcp=True, + activation_type=None, + vm_specific=0x01020304, + ) + with pytest.raises(ValueError): + sut.request_activation(activation_type=RoutingActivationRequest.ActivationType.Default, vm_specific=-1) + + +def test_vm_specific_verification_in_init(mock_socket, mocker): + request_activation_spy = mocker.spy(DoIPClient, "request_activation") + mock_socket.rx_queue.append(successful_activation_response_with_vm) + with pytest.raises(ValueError): + sut = DoIPClient( + test_ip, + test_logical_address, + auto_reconnect_tcp=True, + activation_type=None, + vm_specific=-1, + ) \ No newline at end of file