diff --git a/yabgp/message/attribute/linkstate/__init__.py b/yabgp/message/attribute/linkstate/__init__.py index 1de595d..1dc118f 100644 --- a/yabgp/message/attribute/linkstate/__init__.py +++ b/yabgp/message/attribute/linkstate/__init__.py @@ -16,6 +16,7 @@ from .linkstate import LinkState # noqa from .node.local_router_id import LocalRouterID # noqa from .node.name import NodeName # noqa +from .node.flex_algo_define import FlexAlgorithmDefine # noqa from .node.isisarea import ISISArea # noqa from .node.sr_capabilities import SRCapabilities # noqa from .node.sr_algorithm import SRAlgorithm # noqa @@ -26,6 +27,7 @@ from .node.srlb import SRLB # noqa from .node.srv6_capabilities import SRv6Capabilities # noqa from .link.admingroup import AdminGroup # noqa +from .link.app_spec_link_attr import AppSpecLinkAttr # noqa from .link.remote_router_id import RemoteRouterID # noqa from .link.max_bw import MaxBandwidth # noqa from .link.max_rsv_bw import MaxResvBandwidth # noqa diff --git a/yabgp/message/attribute/linkstate/link/app_spec_link_attr.py b/yabgp/message/attribute/linkstate/link/app_spec_link_attr.py new file mode 100644 index 0000000..da41703 --- /dev/null +++ b/yabgp/message/attribute/linkstate/link/app_spec_link_attr.py @@ -0,0 +1,116 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import struct +import binascii +from yabgp.message.attribute.linkstate.linkstate import LinkState +from yabgp.tlv import TLV + + +@LinkState.register() +class AppSpecLinkAttr(TLV): + """ + Application-Specific Link Attributes TLV (Type 1122) + RFC 9294 Section 3: https://www.rfc-editor.org/rfc/rfc9294.html#section-3 + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SABM Length | UDABM Length | Reserved | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Standard Application Identifier Bit Mask (variable) // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | User-Defined Application Identifier Bit Mask (variable) // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Link Attribute sub-TLVs // + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Fields: + - SABM Length (1 octet): Standard Application Identifier Bit Mask Length + - UDABM Length (1 octet): User-Defined Application Identifier Bit Mask Length + - Reserved (2 octets): Must be zero + - SABM (variable, 0/4/8 octets): Standard Application Identifier Bit Mask + - UDABM (variable, 0/4/8 octets): User-Defined Application Identifier Bit Mask + - Link Attribute sub-TLVs (variable): Application-specific link attributes + """ + TYPE = 1122 + TYPE_STR = 'ASLA' + + @classmethod + def unpack(cls, data): + """ + Unpack Application-Specific Link Attributes TLV + + :param data: binary data (without type and length) + :return: AppSpecLinkAttr instance + """ + # Parse fixed header (4 bytes) + sabm_len = data[0] + udabm_len = data[1] + # reserved = struct.unpack('!H', data[2:4])[0] + + offset = 4 + + # Parse SABM (Standard Application Identifier Bit Mask) + sabm = None + if sabm_len > 0: + sabm_bytes = data[offset:offset + sabm_len] + if sabm_len == 4: + sabm_int = struct.unpack('!I', sabm_bytes)[0] + sabm = '0x{:08x}'.format(sabm_int) + elif sabm_len == 8: + sabm_int = struct.unpack('!Q', sabm_bytes)[0] + sabm = '0x{:016x}'.format(sabm_int) + else: + sabm = '0x' + binascii.b2a_hex(sabm_bytes).decode('ascii') + offset += sabm_len + + # Parse UDABM (User-Defined Application Identifier Bit Mask) + udabm = None + if udabm_len > 0: + udabm_bytes = data[offset:offset + udabm_len] + if udabm_len == 4: + udabm_int = struct.unpack('!I', udabm_bytes)[0] + udabm = '0x{:08x}'.format(udabm_int) + elif udabm_len == 8: + udabm_int = struct.unpack('!Q', udabm_bytes)[0] + udabm = '0x{:016x}'.format(udabm_int) + else: + udabm = '0x' + binascii.b2a_hex(udabm_bytes).decode('ascii') + offset += udabm_len + + # Parse Link Attribute sub-TLVs + sub_tlvs_bin_data = data[offset:] + sub_tlvs = [] + + while sub_tlvs_bin_data: + sub_tlv_type, sub_tlv_len = struct.unpack('!HH', sub_tlvs_bin_data[:4]) + sub_tlv_value = sub_tlvs_bin_data[4:4 + sub_tlv_len] + + if sub_tlv_type in LinkState.registered_tlvs: + sub_tlvs.append(LinkState.registered_tlvs[sub_tlv_type].unpack(sub_tlv_value).dict()) + else: + sub_tlvs.append({ + 'type': sub_tlv_type, + 'value': str(binascii.b2a_hex(sub_tlv_value)) + }) + sub_tlvs_bin_data = sub_tlvs_bin_data[4 + sub_tlv_len:] + + return cls(value={ + 'sabm': {'len': sabm_len, 'value': sabm}, + 'udabm': {'len': udabm_len, 'value': udabm}, + 'sub_tlvs': sub_tlvs + }) diff --git a/yabgp/message/attribute/linkstate/node/flex_algo_define.py b/yabgp/message/attribute/linkstate/node/flex_algo_define.py new file mode 100644 index 0000000..17d24ea --- /dev/null +++ b/yabgp/message/attribute/linkstate/node/flex_algo_define.py @@ -0,0 +1,267 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import struct +import binascii + +from yabgp.message.attribute.linkstate.linkstate import LinkState +from yabgp.tlv import TLV + + +@LinkState.register() +class FlexAlgorithmDefine(TLV): + """ + Flex Algorithm Definition TLV (Type 1039) + RFC 9351 Section 3: https://www.rfc-editor.org/rfc/rfc9351.html#section-3 + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Flex-Algo | Metric-Type | Calc-Type | Priority | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | | + + Sub-TLVs (variable) + + | | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Fields: + - Flex-Algo (1 octet): Flexible Algorithm number (128-255) + - Metric-Type (1 octet): 0=IGP Metric, 1=Min Unidirectional Link Delay, 2=TE Default Metric + - Calc-Type (1 octet): 0=SPF, 1=Strict SPF + - Priority (1 octet): Priority of the FAD definition (0-255) + - Sub-TLVs: Optional sub-TLVs (1040-1044) + """ + TYPE = 1039 + TYPE_STR = 'flex_algo_defn' + + @classmethod + def unpack(cls, data): + """ + Unpack Flex Algorithm Definition TLV + + :param data: binary data (without type and length) + :return: FlexAlgorithmDefine instance + """ + # Parse fixed fields (4 bytes) + flex_algo, metric_type, calc_type, priority = struct.unpack('!BBBB', data[:4]) + + # Parse sub-TLVs if present + sub_tlvs_bin_data = data[4:] + sub_tlvs = [] + + while sub_tlvs_bin_data: + sub_tlvs_type_code, sub_tlvs_length = struct.unpack('!HH', sub_tlvs_bin_data[:4]) + sub_tlvs_value = sub_tlvs_bin_data[4: 4 + sub_tlvs_length] + + if sub_tlvs_type_code in LinkState.registered_tlvs: + sub_tlvs.append(LinkState.registered_tlvs[sub_tlvs_type_code].unpack(sub_tlvs_value).dict()) + else: + sub_tlvs.append({ + 'type': sub_tlvs_type_code, + 'value': str(binascii.b2a_hex(sub_tlvs_value)) + }) + sub_tlvs_bin_data = sub_tlvs_bin_data[4 + sub_tlvs_length:] + + return cls(value={ + 'flex_algo': flex_algo, + 'metric_type': metric_type, + 'calc_type': calc_type, + 'priority': priority, + 'sub_tlvs': sub_tlvs + }) + + +@LinkState.register() +class FlexAlgoExcludeAdminGroup(TLV): + """ + Flex-Algorithm Exclude Admin Group TLV (Type 1040) + RFC 9351 Section 3.1: https://www.rfc-editor.org/rfc/rfc9351.html#section-3.1 + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Extended Admin Group | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | ... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Contains Extended Admin Group bitmask (variable length, 4 octets per group). + Links with any bit set that matches any bit in this bitmask are excluded. + """ + TYPE = 1040 + TYPE_STR = 'flex_algo_excl_admin_group' + + @classmethod + def unpack(cls, data): + """ + Unpack Extended Admin Group + + :param data: binary data + :return: list of 32-bit admin group values + """ + admin_groups = [] + for i in range(0, len(data), 4): + admin_groups.append(struct.unpack('!I', data[i:i+4])[0]) + return cls(value=admin_groups) + + +@LinkState.register() +class FlexAlgoIncludeAnyAdminGroup(TLV): + """ + Flex-Algorithm Include-Any Admin Group TLV (Type 1041) + RFC 9351 Section 3.2: https://www.rfc-editor.org/rfc/rfc9351.html#section-3.2 + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Extended Admin Group | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | ... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Contains Extended Admin Group bitmask (variable length, 4 octets per group). + Links with any bit set that matches any bit in this bitmask are included. + """ + TYPE = 1041 + TYPE_STR = 'flex_algo_incl_any_admin_group' + + @classmethod + def unpack(cls, data): + """ + Unpack Extended Admin Group + + :param data: binary data + :return: list of 32-bit admin group values + """ + admin_groups = [] + for i in range(0, len(data), 4): + admin_groups.append(struct.unpack('!I', data[i:i+4])[0]) + return cls(value=admin_groups) + + +@LinkState.register() +class FlexAlgoIncludeAllAdminGroup(TLV): + """ + Flex-Algorithm Include-All Admin Group TLV (Type 1042) + RFC 9351 Section 3.3: https://www.rfc-editor.org/rfc/rfc9351.html#section-3.3 + + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Extended Admin Group | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | ... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Contains Extended Admin Group bitmask (variable length, 4 octets per group). + Links must have all bits set that match all bits in this bitmask to be included. + """ + TYPE = 1042 + TYPE_STR = 'flex_algo_incl_all_admin_group' + + @classmethod + def unpack(cls, data): + """ + Unpack Extended Admin Group + + :param data: binary data + :return: list of 32-bit admin group values + """ + admin_groups = [] + for i in range(0, len(data), 4): + admin_groups.append(struct.unpack('!I', data[i:i+4])[0]) + return cls(value=admin_groups) + + +@LinkState.register() +class FlexAlgoDefinitionFlags(TLV): + """ + Flex-Algorithm Definition Flags TLV (Type 1043) + RFC 9351 Section 3.4: https://www.rfc-editor.org/rfc/rfc9351.html#section-3.4 + + + Format: + 0 1 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |M| Reserved | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Flags: + M-flag (bit 0): If set, the Flex-Algorithm specific prefix and ASBR metric + MUST be used for inter-area and external prefix calculation. + Reserved: Must be zero on transmission and ignored on reception. + """ + TYPE = 1043 + TYPE_STR = 'flex_algo_defn_flags' + + @classmethod + def unpack(cls, data): + """ + Unpack Flex-Algorithm Definition Flags + + :param data: binary data (2 bytes) + :return: dict with flag values + """ + flags = struct.unpack('!H', data[:2])[0] + flag = { + 'M': flags >> 15 # M-flag is the most significant bit + } + return cls(value=flag) + + +@LinkState.register() +class FlexAlgoExcludeSRLG(TLV): + """ + Flex-Algorithm Exclude SRLG TLV (Type 1044) + RFC 9351 Section 3.5: https://www.rfc-editor.org/rfc/rfc9351.html#section-3.5 + + + Format: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SRLG Value 1 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SRLG Value 2 | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | ... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | SRLG Value N | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + Contains list of 32-bit SRLG (Shared Risk Link Group) values. + Links with any SRLG value matching any value in this list are excluded. + """ + TYPE = 1044 + TYPE_STR = 'flex_algo_excl_srlg' + + @classmethod + def unpack(cls, data): + """ + Unpack SRLG list + + :param data: binary data + :return: list of 32-bit SRLG values + """ + srlg_list = [] + for i in range(0, len(data), 4): + srlg_list.append(struct.unpack('!I', data[i:i+4])[0]) + return cls(value=srlg_list) diff --git a/yabgp/tests/unit/message/attribute/linkstate/link/test_app_spec_link_attr.py b/yabgp/tests/unit/message/attribute/linkstate/link/test_app_spec_link_attr.py new file mode 100644 index 0000000..34f1bba --- /dev/null +++ b/yabgp/tests/unit/message/attribute/linkstate/link/test_app_spec_link_attr.py @@ -0,0 +1,352 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" Test Application-Specific Link Attributes TLV """ + +import unittest + +from yabgp.message.attribute.linkstate.link.app_spec_link_attr import AppSpecLinkAttr + + +class TestAppSpecLinkAttr(unittest.TestCase): + """Test AppSpecLinkAttr TLV (Type 1122)""" + + def test_unpack_with_te_metric(self): + """Test unpack ASLA TLV with TE Metric sub-TLV + + Data: 04 04 00 00 10 00 00 00 00 00 00 00 04 44 00 04 ff ff ff ff + Header (4 bytes): + - SABM Length: 0x04 = 4 + - UDABM Length: 0x04 = 4 + - Reserved: 0x0000 + SABM (4 bytes): 0x10000000 + UDABM (4 bytes): 0x00000000 + Sub-TLV (8 bytes): + - Type: 0x0444 = 1092 (TE Default Metric) + - Length: 0x0004 = 4 + - Value: 0xffffffff = 4294967295 + """ + data_bin = bytes.fromhex('04040000100000000000000004440004ffffffff') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 4, 'value': '0x10000000'}, + 'udabm': {'len': 4, 'value': '0x00000000'}, + 'sub_tlvs': [ + { + 'type': 'te_metric', + 'value': 4294967295 + } + ] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_zero_length_masks(self): + """Test unpack ASLA TLV with zero-length SABM and UDABM + + Data: 00 00 00 00 04 44 00 04 00 00 00 64 + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + SABM: none (length = 0) + UDABM: none (length = 0) + Sub-TLV (8 bytes): + - Type: 0x0444 = 1092 (TE Default Metric) + - Length: 0x0004 = 4 + - Value: 0x00000064 = 100 + """ + data_bin = bytes.fromhex('000000000444000400000064') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 0, 'value': None}, + 'udabm': {'len': 0, 'value': None}, + 'sub_tlvs': [ + { + 'type': 'te_metric', + 'value': 100 + } + ] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_8_byte_masks(self): + """Test unpack ASLA TLV with 8-byte (64-bit) SABM and UDABM + + Data: 08 08 00 00 12 34 56 78 9a bc de f0 00 00 00 00 00 00 00 ff + Header (4 bytes): + - SABM Length: 0x08 = 8 + - UDABM Length: 0x08 = 8 + - Reserved: 0x0000 + SABM (8 bytes): 0x123456789abcdef0 + UDABM (8 bytes): 0x00000000000000ff + """ + data_bin = bytes.fromhex('08080000123456789abcdef000000000000000ff') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 8, 'value': '0x123456789abcdef0'}, + 'udabm': {'len': 8, 'value': '0x00000000000000ff'}, + 'sub_tlvs': [] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_only_sabm(self): + """Test unpack ASLA TLV with only SABM (UDABM length = 0) + + Data: 04 00 00 00 10 00 00 00 + Header (4 bytes): + - SABM Length: 0x04 = 4 + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + SABM (4 bytes): 0x10000000 + UDABM: none (length = 0) + """ + data_bin = bytes.fromhex('0400000010000000') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 4, 'value': '0x10000000'}, + 'udabm': {'len': 0, 'value': None}, + 'sub_tlvs': [] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_only_udabm(self): + """Test unpack ASLA TLV with only UDABM (SABM length = 0) + + Data: 00 04 00 00 ab cd ef 12 + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x04 = 4 + - Reserved: 0x0000 + SABM: none (length = 0) + UDABM (4 bytes): 0xabcdef12 + """ + data_bin = bytes.fromhex('00040000abcdef12') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 0, 'value': None}, + 'udabm': {'len': 4, 'value': '0xabcdef12'}, + 'sub_tlvs': [] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_unknown_sub_tlv(self): + """Test unpack ASLA TLV with unknown sub-TLV type + + Data: 00 00 00 00 ff ff 00 04 de ad be ef + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + Sub-TLV (8 bytes): + - Type: 0xffff = 65535 (Unknown) + - Length: 0x0004 = 4 + - Value: 0xdeadbeef + """ + data_bin = bytes.fromhex('00000000ffff0004deadbeef') + + result = AppSpecLinkAttr.unpack(data_bin).dict() + + self.assertEqual(result['type'], 'ASLA') + self.assertEqual(result['value']['sabm'], {'len': 0, 'value': None}) + self.assertEqual(result['value']['udabm'], {'len': 0, 'value': None}) + self.assertEqual(len(result['value']['sub_tlvs']), 1) + self.assertEqual(result['value']['sub_tlvs'][0]['type'], 65535) + # Unknown sub-TLV value is returned as hex string + self.assertIn('deadbeef', result['value']['sub_tlvs'][0]['value'].lower()) + + def test_unpack_with_multiple_sub_tlvs(self): + """Test unpack ASLA TLV with multiple sub-TLVs + + Data: 04 04 00 00 10 00 00 00 00 00 00 00 + 04 44 00 04 00 00 00 0a + 04 44 00 04 00 00 00 14 + Header (4 bytes): + - SABM Length: 0x04 = 4 + - UDABM Length: 0x04 = 4 + - Reserved: 0x0000 + SABM (4 bytes): 0x10000000 + UDABM (4 bytes): 0x00000000 + Sub-TLV 1 (8 bytes): + - Type: 0x0444 = 1092 (TE Default Metric) + - Length: 0x0004 = 4 + - Value: 0x0000000a = 10 + Sub-TLV 2 (8 bytes): + - Type: 0x0444 = 1092 (TE Default Metric) + - Length: 0x0004 = 4 + - Value: 0x00000014 = 20 + """ + data_bin = bytes.fromhex( + '04040000' # Header + '10000000' # SABM + '00000000' # UDABM + '044400040000000a' # Sub-TLV 1: TE Metric = 10 + '0444000400000014' # Sub-TLV 2: TE Metric = 20 + ) + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 4, 'value': '0x10000000'}, + 'udabm': {'len': 4, 'value': '0x00000000'}, + 'sub_tlvs': [ + {'type': 'te_metric', 'value': 10}, + {'type': 'te_metric', 'value': 20} + ] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_header_only(self): + """Test unpack ASLA TLV with header only (no masks, no sub-TLVs) + + Data: 00 00 00 00 + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + """ + data_bin = bytes.fromhex('00000000') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 0, 'value': None}, + 'udabm': {'len': 0, 'value': None}, + 'sub_tlvs': [] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + def test_unpack_with_8_byte_sabm_4_byte_udabm(self): + """Test unpack ASLA TLV with mixed mask lengths (8-byte SABM, 4-byte UDABM) + + Data: 08 04 00 00 00 00 00 00 00 00 00 01 ff ff ff ff + Header (4 bytes): + - SABM Length: 0x08 = 8 + - UDABM Length: 0x04 = 4 + - Reserved: 0x0000 + SABM (8 bytes): 0x0000000000000001 + UDABM (4 bytes): 0xffffffff + """ + data_bin = bytes.fromhex('080400000000000000000001ffffffff') + + expected = { + 'type': 'ASLA', + 'value': { + 'sabm': {'len': 8, 'value': '0x0000000000000001'}, + 'udabm': {'len': 4, 'value': '0xffffffff'}, + 'sub_tlvs': [] + } + } + + self.assertEqual(expected, AppSpecLinkAttr.unpack(data_bin).dict()) + + # ==================== Exception/Abnormal Packet Tests ==================== + + def test_unpack_data_too_short(self): + """Test unpack ASLA TLV with data shorter than header (< 4 bytes) + + Data: 04 04 00 (only 3 bytes, header requires 4 bytes) + Expected: IndexError or struct.error + """ + data_bin = bytes.fromhex('040400') # Only 3 bytes + + with self.assertRaises((IndexError, Exception)): + AppSpecLinkAttr.unpack(data_bin) + + def test_unpack_empty_data(self): + """Test unpack ASLA TLV with empty data + + Data: (empty) + Expected: IndexError or struct.error + """ + data_bin = b'' + + with self.assertRaises((IndexError, Exception)): + AppSpecLinkAttr.unpack(data_bin) + + def test_unpack_sabm_length_exceeds_data(self): + """Test unpack ASLA TLV where SABM length exceeds available data + + Data: 08 00 00 00 12 34 56 78 (SABM length=8 but only 4 bytes available) + Header (4 bytes): + - SABM Length: 0x08 = 8 (but only 4 bytes follow) + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + SABM: 0x12345678 (truncated, should be 8 bytes) + Expected: struct.error or incorrect parsing + """ + data_bin = bytes.fromhex('0800000012345678') # SABM length=8, but only 4 bytes + + with self.assertRaises((Exception,)): + AppSpecLinkAttr.unpack(data_bin) + + def test_unpack_udabm_length_exceeds_data(self): + """Test unpack ASLA TLV where UDABM length exceeds available data + + Data: 00 08 00 00 12 34 56 78 (UDABM length=8 but only 4 bytes available) + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x08 = 8 (but only 4 bytes follow) + - Reserved: 0x0000 + UDABM: 0x12345678 (truncated, should be 8 bytes) + Expected: struct.error or incorrect parsing + """ + data_bin = bytes.fromhex('0008000012345678') # UDABM length=8, but only 4 bytes + + with self.assertRaises((Exception,)): + AppSpecLinkAttr.unpack(data_bin) + + def test_unpack_sub_tlv_header_incomplete(self): + """Test unpack ASLA TLV where sub-TLV header is incomplete + + Data: 00 00 00 00 04 44 (sub-TLV type present but length missing) + Header (4 bytes): + - SABM Length: 0x00 = 0 + - UDABM Length: 0x00 = 0 + - Reserved: 0x0000 + Sub-TLV: incomplete header (only 2 bytes, needs 4 for type+length) + Expected: struct.error + """ + data_bin = bytes.fromhex('000000000444') # Incomplete sub-TLV header + + with self.assertRaises((Exception,)): + AppSpecLinkAttr.unpack(data_bin) + + +if __name__ == "__main__": + unittest.main() diff --git a/yabgp/tests/unit/message/attribute/linkstate/node/test_flex_algo_define.py b/yabgp/tests/unit/message/attribute/linkstate/node/test_flex_algo_define.py new file mode 100644 index 0000000..2fa9f5f --- /dev/null +++ b/yabgp/tests/unit/message/attribute/linkstate/node/test_flex_algo_define.py @@ -0,0 +1,201 @@ +# Copyright 2025 Cisco Systems, Inc. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" Test Flex Algorithm Definition TLV """ + +import unittest + +from yabgp.message.attribute.linkstate.node.flex_algo_define import ( + FlexAlgorithmDefine, + FlexAlgoExcludeAdminGroup, + FlexAlgoIncludeAnyAdminGroup, + FlexAlgoIncludeAllAdminGroup, + FlexAlgoDefinitionFlags, + FlexAlgoExcludeSRLG +) + + +class TestFlexAlgorithmDefine(unittest.TestCase): + """Test FlexAlgorithmDefine TLV (Type 1039)""" + + def test_unpack_without_sub_tlvs(self): + """Test unpack FAD TLV without sub-TLVs + + Data: 80 01 00 80 + - Flex-Algo: 0x80 = 128 + - Metric-Type: 0x01 = 1 (Min Unidirectional Link Delay) + - Calc-Type: 0x00 = 0 (SPF) + - Priority: 0x80 = 128 + """ + data_bin = bytes.fromhex('80010080') + expected = { + 'type': 'flex_algo_defn', + 'value': { + 'flex_algo': 128, + 'metric_type': 1, + 'calc_type': 0, + 'priority': 128, + 'sub_tlvs': [] + } + } + self.assertEqual(expected, FlexAlgorithmDefine.unpack(data_bin).dict()) + + def test_unpack_with_sub_tlvs(self): + """Test unpack FAD TLV with sub-TLVs + + Data: 80 01 00 80 04 13 00 02 80 00 + Fixed fields (4 bytes): + - Flex-Algo: 0x80 = 128 + - Metric-Type: 0x01 = 1 (Min Unidirectional Link Delay) + - Calc-Type: 0x00 = 0 (SPF) + - Priority: 0x80 = 128 + Sub-TLV 1043 (6 bytes): + - Type: 0x0413 = 1043 (Flex-Algo Definition Flags) + - Length: 0x0002 = 2 + - Value: 0x8000 (M-flag = 1) + """ + data_bin = bytes.fromhex('80010080' + '041300028000') + expected = { + 'type': 'flex_algo_defn', + 'value': { + 'flex_algo': 128, + 'metric_type': 1, + 'calc_type': 0, + 'priority': 128, + 'sub_tlvs': [ + { + 'type': 'flex_algo_defn_flags', + 'value': {'M': 1} + } + ] + } + } + self.assertEqual(expected, FlexAlgorithmDefine.unpack(data_bin).dict()) + + +class TestFlexAlgoExcludeAdminGroup(unittest.TestCase): + """Test FlexAlgoExcludeAdminGroup TLV (Type 1040)""" + + def test_unpack_single_group(self): + """Test unpack with single admin group + + Data: 00 00 00 01 + - Admin Group: 0x00000001 = 1 + """ + data_bin = bytes.fromhex('00000001') + expected = { + 'type': 'flex_algo_excl_admin_group', + 'value': [1] + } + self.assertEqual(expected, FlexAlgoExcludeAdminGroup.unpack(data_bin).dict()) + + def test_unpack_multiple_groups(self): + """Test unpack with multiple admin groups + + Data: 00 00 00 01 00 00 00 02 FF FF FF FF + - Admin Group 1: 0x00000001 = 1 + - Admin Group 2: 0x00000002 = 2 + - Admin Group 3: 0xFFFFFFFF = 4294967295 + """ + data_bin = bytes.fromhex('000000010000000200000003') + expected = { + 'type': 'flex_algo_excl_admin_group', + 'value': [1, 2, 3] + } + self.assertEqual(expected, FlexAlgoExcludeAdminGroup.unpack(data_bin).dict()) + + +class TestFlexAlgoIncludeAnyAdminGroup(unittest.TestCase): + """Test FlexAlgoIncludeAnyAdminGroup TLV (Type 1041)""" + + def test_unpack(self): + """Test unpack Include-Any Admin Group""" + data_bin = bytes.fromhex('00000001000000FF') + expected = { + 'type': 'flex_algo_incl_any_admin_group', + 'value': [1, 255] + } + self.assertEqual(expected, FlexAlgoIncludeAnyAdminGroup.unpack(data_bin).dict()) + + +class TestFlexAlgoIncludeAllAdminGroup(unittest.TestCase): + """Test FlexAlgoIncludeAllAdminGroup TLV (Type 1042)""" + + def test_unpack(self): + """Test unpack Include-All Admin Group""" + data_bin = bytes.fromhex('00000003') + expected = { + 'type': 'flex_algo_incl_all_admin_group', + 'value': [3] + } + self.assertEqual(expected, FlexAlgoIncludeAllAdminGroup.unpack(data_bin).dict()) + + +class TestFlexAlgoDefinitionFlags(unittest.TestCase): + """Test FlexAlgoDefinitionFlags TLV (Type 1043)""" + + def test_unpack_m_flag_not_set(self): + """Test unpack with M-flag not set + + Data: 00 00 + - Flags: 0x0000 + - M-flag: 0 + """ + data_bin = bytes.fromhex('0000') + expected = { + 'type': 'flex_algo_defn_flags', + 'value': {'M': 0} + } + self.assertEqual(expected, FlexAlgoDefinitionFlags.unpack(data_bin).dict()) + + def test_unpack_m_flag_set(self): + """Test unpack with M-flag set + + Data: 80 00 + - Flags: 0x8000 + - M-flag: 1 (bit 15 set) + """ + data_bin = bytes.fromhex('8000') + expected = { + 'type': 'flex_algo_defn_flags', + 'value': {'M': 1} + } + self.assertEqual(expected, FlexAlgoDefinitionFlags.unpack(data_bin).dict()) + + +class TestFlexAlgoExcludeSRLG(unittest.TestCase): + """Test FlexAlgoExcludeSRLG TLV (Type 1044)""" + + def test_unpack_single_srlg(self): + """Test unpack with single SRLG value""" + data_bin = bytes.fromhex('00000064') # SRLG = 100 + expected = { + 'type': 'flex_algo_excl_srlg', + 'value': [100] + } + self.assertEqual(expected, FlexAlgoExcludeSRLG.unpack(data_bin).dict()) + + def test_unpack_multiple_srlg(self): + """Test unpack with multiple SRLG values""" + data_bin = bytes.fromhex('000000640000012C000001F4') # SRLG = 100, 300, 500 + expected = { + 'type': 'flex_algo_excl_srlg', + 'value': [100, 300, 500] + } + self.assertEqual(expected, FlexAlgoExcludeSRLG.unpack(data_bin).dict()) + + +if __name__ == "__main__": + unittest.main()