From e09fafa02d086c7e67de7fd935c8e9c30a42d3b6 Mon Sep 17 00:00:00 2001 From: "LV Xinyu (BCSC/EPA4)" Date: Tue, 18 Feb 2025 19:52:05 +0800 Subject: [PATCH 1/3] add ecu_func_address and send_diagnostic_func() for DoIPClient --- doipclient/client.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/doipclient/client.py b/doipclient/client.py index d38b73b..494cfde 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -114,7 +114,6 @@ class DoIPClient: with ECU's over automotive ethernet. Certain parts of the specification would require threaded operation to maintain the time-based state described by the ISO document. However, in practice these are rarely important, particularly for use with UDS - especially with scripts that tend to go through instructions as fast as possible. - :param ecu_ip_address: This is the IP address of the target ECU. This should be a string representing an IPv4 address like "192.168.1.1" or an IPv6 address like "2001:db8::". Like the logical_address, if you don't know the value for your ECU, utilize the get_entity() or await_vehicle_announcement() method. @@ -124,6 +123,9 @@ class DoIPClient: either use the get_entity() method OR the await_vehicle_announcement() method and power cycle the ECU - it should identify itself on bootup. :type ecu_logical_address: int + :param ecu_func_address: The functional address of the target ECU. This should be an integer. According to the + specification, the correct range is 0x0001 to 0x0DFF ("VM specific"). + :type ecu_func_address: int :param tcp_port: The destination TCP port for DoIP data communication. By default this is 13400 for unsecure and 3496 when using TLS. :type tcp_port: int, optional @@ -158,6 +160,7 @@ def __init__( self, ecu_ip_address, ecu_logical_address, + ecu_func_address, tcp_port=TCP_DATA_UNSECURED, udp_port=UDP_DISCOVERY, activation_type=RoutingActivationRequest.ActivationType.Default, @@ -168,6 +171,7 @@ def __init__( auto_reconnect_tcp=False, ): self._ecu_logical_address = ecu_logical_address + self._ecu_func_address = ecu_func_address self._client_logical_address = client_logical_address self._client_ip_address = client_ip_address self._use_secure = use_secure @@ -731,6 +735,42 @@ def send_diagnostic(self, diagnostic_payload, timeout=A_PROCESSING_TIME): ) ) + def send_diagnostic_func(self, diagnostic_payload, timeout=A_PROCESSING_TIME): + """Send a raw diagnostic payload (ie: UDS) to the functional address of ECU. + + :param diagnostic_payload: UDS payload to transmit to the ECU + :type diagnostic_payload: bytearray + :raises IOError: DoIP negative acknowledgement received + """ + message = DiagnosticMessage( + self._client_logical_address, self._ecu_func_address, diagnostic_payload + ) + self.send_doip_message(message) + start_time = time.time() + while True: + ellapsed_time = time.time() - start_time + if timeout and ellapsed_time > timeout: + raise TimeoutError("Timed out waiting for diagnostic response") + if timeout: + result = self.read_doip(timeout=(timeout - ellapsed_time)) + else: + result = self.read_doip() + if type(result) == DiagnosticMessageNegativeAcknowledgement: + raise IOError( + "Diagnostic request rejected with negative acknowledge code: {}".format( + result.nack_code + ) + ) + elif type(result) == DiagnosticMessagePositiveAcknowledgement: + return + elif result: + logger.warning( + "Received unexpected DoIP message type {}. Ignoring".format( + type(result) + ) + ) + + def receive_diagnostic(self, timeout=None): """Receive a raw diagnostic payload (ie: UDS) from the ECU. From 9844765e667aff0f5bb349e3e5ef564b45b2d776 Mon Sep 17 00:00:00 2001 From: "LV Xinyu (BCSC/EPA4)" Date: Tue, 18 Feb 2025 19:56:28 +0800 Subject: [PATCH 2/3] add default value for ecu_func_addr --- doipclient/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doipclient/client.py b/doipclient/client.py index 494cfde..07496c6 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -160,7 +160,7 @@ def __init__( self, ecu_ip_address, ecu_logical_address, - ecu_func_address, + ecu_func_address=None, tcp_port=TCP_DATA_UNSECURED, udp_port=UDP_DISCOVERY, activation_type=RoutingActivationRequest.ActivationType.Default, From dd208c7f369773a36cc4fa2cad588716f84162a4 Mon Sep 17 00:00:00 2001 From: greendog Date: Mon, 3 Mar 2025 20:17:31 +0800 Subject: [PATCH 3/3] add new function send_diagnostic_to_adddress(), refactor send_diagnostic() and add unit test for the new function. --- doipclient/client.py | 43 +++++++------------------------------------ tests/test_client.py | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/doipclient/client.py b/doipclient/client.py index 07496c6..5446132 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -114,6 +114,7 @@ class DoIPClient: with ECU's over automotive ethernet. Certain parts of the specification would require threaded operation to maintain the time-based state described by the ISO document. However, in practice these are rarely important, particularly for use with UDS - especially with scripts that tend to go through instructions as fast as possible. + :param ecu_ip_address: This is the IP address of the target ECU. This should be a string representing an IPv4 address like "192.168.1.1" or an IPv6 address like "2001:db8::". Like the logical_address, if you don't know the value for your ECU, utilize the get_entity() or await_vehicle_announcement() method. @@ -123,9 +124,6 @@ class DoIPClient: either use the get_entity() method OR the await_vehicle_announcement() method and power cycle the ECU - it should identify itself on bootup. :type ecu_logical_address: int - :param ecu_func_address: The functional address of the target ECU. This should be an integer. According to the - specification, the correct range is 0x0001 to 0x0DFF ("VM specific"). - :type ecu_func_address: int :param tcp_port: The destination TCP port for DoIP data communication. By default this is 13400 for unsecure and 3496 when using TLS. :type tcp_port: int, optional @@ -160,7 +158,6 @@ def __init__( self, ecu_ip_address, ecu_logical_address, - ecu_func_address=None, tcp_port=TCP_DATA_UNSECURED, udp_port=UDP_DISCOVERY, activation_type=RoutingActivationRequest.ActivationType.Default, @@ -171,7 +168,6 @@ def __init__( auto_reconnect_tcp=False, ): self._ecu_logical_address = ecu_logical_address - self._ecu_func_address = ecu_func_address self._client_logical_address = client_logical_address self._client_ip_address = client_ip_address self._use_secure = use_secure @@ -707,43 +703,19 @@ def send_diagnostic(self, diagnostic_payload, timeout=A_PROCESSING_TIME): :type diagnostic_payload: bytearray :raises IOError: DoIP negative acknowledgement received """ - message = DiagnosticMessage( - self._client_logical_address, self._ecu_logical_address, diagnostic_payload - ) - self.send_doip_message(message) - start_time = time.time() - while True: - ellapsed_time = time.time() - start_time - if timeout and ellapsed_time > timeout: - raise TimeoutError("Timed out waiting for diagnostic response") - if timeout: - result = self.read_doip(timeout=(timeout - ellapsed_time)) - else: - result = self.read_doip() - if type(result) == DiagnosticMessageNegativeAcknowledgement: - raise IOError( - "Diagnostic request rejected with negative acknowledge code: {}".format( - result.nack_code - ) - ) - elif type(result) == DiagnosticMessagePositiveAcknowledgement: - return - elif result: - logger.warning( - "Received unexpected DoIP message type {}. Ignoring".format( - type(result) - ) - ) + self.send_diagnostic_to_address(self._ecu_logical_address, diagnostic_payload, timeout) - def send_diagnostic_func(self, diagnostic_payload, timeout=A_PROCESSING_TIME): - """Send a raw diagnostic payload (ie: UDS) to the functional address of ECU. + def send_diagnostic_to_address(self, address, diagnostic_payload, timeout=A_PROCESSING_TIME): + """Send a raw diagnostic payload (ie: UDS) to the specified address. + :param address: The logical address to send the diagnostic payload to + :type address: int :param diagnostic_payload: UDS payload to transmit to the ECU :type diagnostic_payload: bytearray :raises IOError: DoIP negative acknowledgement received """ message = DiagnosticMessage( - self._client_logical_address, self._ecu_func_address, diagnostic_payload + self._client_logical_address, address, diagnostic_payload ) self.send_doip_message(message) start_time = time.time() @@ -770,7 +742,6 @@ def send_diagnostic_func(self, diagnostic_payload, timeout=A_PROCESSING_TIME): ) ) - def receive_diagnostic(self, timeout=None): """Receive a raw diagnostic payload (ie: UDS) from the ECU. diff --git a/tests/test_client.py b/tests/test_client.py index 080c91f..305a07a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -99,6 +99,9 @@ diagnostic_request = bytearray( [int(x, 16) for x in "02 fd 80 01 00 00 00 07 0e 00 00 01 00 01 02".split(" ")] ) +diagnostic_request_to_address = bytearray( + [int(x, 16) for x in "02 fd 80 01 00 00 00 07 0e 00 12 34 00 01 02".split(" ")] +) unknown_mercedes_message = bytearray( [ int(x, 16) @@ -500,6 +503,22 @@ def test_send_diagnostic_negative(mock_socket): assert mock_socket.tx_queue[-1] == diagnostic_request +def test_send_diagnostic_to_address_positive(mock_socket): + sut = DoIPClient(test_ip, test_logical_address) + mock_socket.rx_queue.append(diagnostic_positive_response) + assert None == sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2])) + assert mock_socket.tx_queue[-1] == diagnostic_request_to_address + +def test_send_diagnostic_to_address_negative(mock_socket): + sut = DoIPClient(test_ip, test_logical_address) + mock_socket.rx_queue.append(diagnostic_negative_response) + with pytest.raises( + IOError, match=r"Diagnostic request rejected with negative acknowledge code" + ): + result = sut.send_diagnostic_to_address(0x1234, bytearray([0, 1, 2])) + assert mock_socket.tx_queue[-1] == diagnostic_request_to_address + + def test_receive_diagnostic(mock_socket): sut = DoIPClient(test_ip, test_logical_address) mock_socket.rx_queue.append(diagnostic_result)