diff --git a/flow/record/fieldtypes/net/__init__.py b/flow/record/fieldtypes/net/__init__.py index b603ad8e..5c46c053 100644 --- a/flow/record/fieldtypes/net/__init__.py +++ b/flow/record/fieldtypes/net/__init__.py @@ -1,12 +1,19 @@ from __future__ import annotations from flow.record.fieldtypes import string -from flow.record.fieldtypes.net.ip import IPAddress, IPNetwork, ipaddress, ipnetwork +from flow.record.fieldtypes.net.ip import ( + IPAddress, + IPNetwork, + ipaddress, + ipinterface, + ipnetwork, +) __all__ = [ "IPAddress", "IPNetwork", "ipaddress", + "ipinterface", "ipnetwork", ] diff --git a/flow/record/fieldtypes/net/ip.py b/flow/record/fieldtypes/net/ip.py index 9d699579..85965d89 100644 --- a/flow/record/fieldtypes/net/ip.py +++ b/flow/record/fieldtypes/net/ip.py @@ -2,10 +2,13 @@ from ipaddress import ( IPv4Address, + IPv4Interface, IPv4Network, IPv6Address, + IPv6Interface, IPv6Network, ip_address, + ip_interface, ip_network, ) from typing import Union @@ -15,16 +18,19 @@ _IPNetwork = Union[IPv4Network, IPv6Network] _IPAddress = Union[IPv4Address, IPv6Address] +_IPInterface = Union[IPv4Interface, IPv6Interface] +_ConversionTypes = Union[str, int, bytes] +_IPTypes = Union[_IPNetwork, _IPAddress, _IPInterface] class ipaddress(FieldType): - val = None + val: _IPAddress = None _type = "net.ipaddress" - def __init__(self, addr: str | int | bytes): + def __init__(self, addr: _ConversionTypes | _IPAddress): self.val = ip_address(addr) - def __eq__(self, b: str | int | bytes | _IPAddress) -> bool: + def __eq__(self, b: _ConversionTypes | _IPAddress) -> bool: try: return self.val == ip_address(b) except ValueError: @@ -53,13 +59,13 @@ def _unpack(data: int) -> ipaddress: class ipnetwork(FieldType): - val = None + val: _IPNetwork = None _type = "net.ipnetwork" - def __init__(self, addr: str | int | bytes): + def __init__(self, addr: _ConversionTypes | _IPNetwork): self.val = ip_network(addr) - def __eq__(self, b: str | int | bytes | _IPNetwork) -> bool: + def __eq__(self, b: _ConversionTypes | _IPNetwork) -> bool: try: return self.val == ip_network(b) except ValueError: @@ -98,6 +104,52 @@ def _pack(self) -> str: def _unpack(data: str) -> ipnetwork: return ipnetwork(data) + @property + def netmask(self) -> ipaddress: + return ipaddress(self.val.netmask) + + +class ipinterface(FieldType): + val: _IPInterface = None + _type = "net.ipinterface" + + def __init__(self, addr: _ConversionTypes | _IPTypes) -> None: + self.val = ip_interface(addr) + + def __eq__(self, b: _ConversionTypes | _IPTypes) -> bool: + try: + return self.val == ip_interface(b) + except ValueError: + return False + + def __hash__(self) -> int: + return hash(self.val) + + def __str__(self) -> str: + return str(self.val) + + def __repr__(self) -> str: + return f"{self._type}({str(self)!r})" + + @property + def ip(self) -> ipaddress: + return ipaddress(self.val.ip) + + @property + def network(self) -> ipnetwork: + return ipnetwork(self.val.network) + + @property + def netmask(self) -> ipaddress: + return ipaddress(self.val.netmask) + + def _pack(self) -> str: + return self.val.compressed + + @staticmethod + def _unpack(data: str) -> ipinterface: + return ipinterface(data) + # alias: net.IPAddress -> net.ipaddress # alias: net.IPNetwork -> net.ipnetwork diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 86c03fb4..01857bf2 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -68,7 +68,7 @@ def pack_obj(self, obj: Any) -> dict | str: "sha1": obj.sha1, "sha256": obj.sha256, } - if isinstance(obj, (fieldtypes.net.ipaddress, fieldtypes.net.ipnetwork)): + if isinstance(obj, (fieldtypes.net.ipaddress, fieldtypes.net.ipnetwork, fieldtypes.net.ipinterface)): return str(obj) if isinstance(obj, bytes): return base64.b64encode(obj).decode() diff --git a/flow/record/whitelist.py b/flow/record/whitelist.py index c1b41ced..0259f70d 100644 --- a/flow/record/whitelist.py +++ b/flow/record/whitelist.py @@ -24,6 +24,7 @@ "bytes", "record", "net.ipaddress", + "net.ipinterface", "net.ipnetwork", "net.IPAddress", "net.IPNetwork", diff --git a/tests/test_fieldtype_ip.py b/tests/test_fieldtype_ip.py index c0d7c4af..a1c224ee 100644 --- a/tests/test_fieldtype_ip.py +++ b/tests/test_fieldtype_ip.py @@ -128,6 +128,85 @@ def test_record_ipnetwork() -> None: assert "::1" not in data +def test_record_ipinterface() -> None: + TestRecord = RecordDescriptor( + "test/ipinterface", + [ + ("net.ipinterface", "interface"), + ], + ) + + # ipv4 + r = TestRecord("192.168.0.0/24") + assert r.interface == "192.168.0.0/24" + assert "bad.ip" not in r.interface.network + assert "192.168.0.1" in r.interface.network + assert isinstance(r.interface, net.ipinterface) + assert repr(r.interface) == "net.ipinterface('192.168.0.0/24')" + assert hash(r.interface) == hash(net.ipinterface("192.168.0.0/24")) + + r = TestRecord("192.168.1.1") + assert r.interface.ip == "192.168.1.1" + assert r.interface.network == "192.168.1.1/32" + assert r.interface == "192.168.1.1/32" + assert r.interface.netmask == "255.255.255.255" + + r = TestRecord("192.168.1.24/255.255.255.0") + assert r.interface == "192.168.1.24/24" + assert r.interface.ip == "192.168.1.24" + assert r.interface.network == "192.168.1.0/24" + assert r.interface.netmask == "255.255.255.0" + + # ipv6 - https://en.wikipedia.org/wiki/IPv6_address + r = TestRecord("::1") + assert r.interface == "::1" + assert r.interface == "::1/128" + + r = TestRecord("64:ff9b::2/96") + assert r.interface == "64:ff9b::2/96" + assert r.interface.ip == "64:ff9b::2" + assert r.interface.network == "64:ff9b::/96" + assert r.interface.netmask == "ffff:ffff:ffff:ffff:ffff:ffff::" + + # instantiate from different types + assert TestRecord(1).interface == "0.0.0.1/32" + assert TestRecord(0x7F0000FF).interface == "127.0.0.255/32" + assert TestRecord(b"\x7f\xff\xff\xff").interface == "127.255.255.255/32" + + # Test whether it functions in a set + data = {TestRecord(x).interface for x in ["192.168.0.0/24", "192.168.0.0/24", "::1", "::1"]} + assert len(data) == 2 + assert net.ipinterface("::1") in data + assert net.ipinterface("192.168.0.0/24") in data + assert "::1" not in data + + +def test_record_ipinterface_types() -> None: + TestRecord = RecordDescriptor( + "test/ipinterface", + [ + ( + "net.ipinterface", + "interface", + ) + ], + ) + + r = TestRecord("192.168.0.255/24") + _if = r.interface + assert isinstance(_if, net.ipinterface) + assert isinstance(_if.ip, net.ipaddress) + assert isinstance(_if.network, net.ipnetwork) + assert isinstance(_if.netmask, net.ipaddress) + + r = TestRecord("64:ff9b::/96") + _if = r.interface + assert isinstance(_if, net.ipinterface) + assert isinstance(_if.ip, net.ipaddress) + assert isinstance(_if.network, net.ipnetwork) + assert isinstance(_if.netmask, net.ipaddress) + + @pytest.mark.parametrize("PSelector", [Selector, CompiledSelector]) def test_selector_ipaddress(PSelector: type[Selector]) -> None: TestRecord = RecordDescriptor(